Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 61 additions & 60 deletions proxylist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def _current_task_name() -> str:
frame = inspect.currentframe()
outermost_name = "unknown"
while frame is not None:
if frame.f_globals.get("__name__") == __name__ and frame.f_code.co_name != "_current_task_name":
if (
frame.f_globals.get("__name__") == __name__
and frame.f_code.co_name != "_current_task_name"
):
outermost_name = frame.f_code.co_name
frame = frame.f_back
return outermost_name
Expand Down Expand Up @@ -72,75 +75,75 @@ def remove_low_quality_proxies() -> None:
)


def update_status():
log.info(
"Updating proxies status", extra={"task": _current_task_name()}
)
start_time = now()
PROXY_UPDATE_FIELDS = [
"is_active",
"ip_address",
"last_active",
"location",
"location_country_code",
"location_country",
"times_checked",
"times_check_succeeded",
"last_checked",
]


def _check_connectivity() -> bool:
try:
req = requests.get("https://clients3.google.com/generate_204")
except (SSLError, ConnectionError, ReadTimeout):
log.error(
"The Shadowmere host is having connection issues. Skipping test cycle."
"The Shadowmere host is having connection issues. Skipping test cycle.",
extra={"task": _current_task_name()},
)
return False
if req.status_code != 204:
log.error(
"The Shadowmere host is having connection issues. Skipping test cycle.",
extra={"task": _current_task_name()},
)
return
return False
return True

log.info("Using ShadowTest URLs", extra={"url": settings.SHADOWTEST_SERVERS})

if req.status_code == 204:
proxies = list(Proxy.objects.all())
with ThreadPoolExecutor(max_workers=CONCURRENT_CHECKS) as executor:
executor.map(update_proxy_status, proxies)
executor.shutdown(wait=True)
def _run_proxy_checks(proxies: list[Proxy]) -> None:
with ThreadPoolExecutor(max_workers=CONCURRENT_CHECKS) as executor:
executor.map(update_proxy_status, proxies)
log.info(
"Proxies statuses checked. Saving new status now.",
extra={"task": _current_task_name()},
)

log.info(
"Proxies statuses checked. Saving new status now.",
extra={"task": _current_task_name()},
)

update_fields = [
"is_active",
"ip_address",
"last_active",
"location",
"location_country_code",
"location_country",
"times_checked",
"times_check_succeeded",
"last_checked",
]
saved_proxies = 0
deleted_proxies = 0
proxies_to_update = []

for proxy in proxies:
try:
# Validate uniqueness before batching
proxies_to_update.append(proxy)
saved_proxies += 1
except Exception:
deleted_proxies += 1
def _persist_proxy_updates(proxies: list[Proxy]) -> int:
if not proxies:
return 0
Proxy.objects.bulk_update(proxies, PROXY_UPDATE_FIELDS, batch_size=500)
cache.clear()
return len(proxies)

if proxies_to_update:
Proxy.objects.bulk_update(proxies_to_update, update_fields, batch_size=500)
cache.clear()

log.info(
"Update completed",
extra={
"task": _current_task_name(),
"saved": saved_proxies,
"deleted": deleted_proxies,
"start_time": start_time,
"finish_time": now(),
},
)
else:
log.error(
"The Shadowmere host is having connection issues. Skipping test cycle.",
extra={"task": _current_task_name()},
)
def update_status():
log.info("Updating proxies status", extra={"task": _current_task_name()})
start_time = now()

if not _check_connectivity():
return
log.info("Using ShadowTest URLs", extra={"url": settings.SHADOWTEST_SERVERS})

proxies = list(Proxy.objects.all())
_run_proxy_checks(proxies)
saved = _persist_proxy_updates(proxies)

log.info(
"Update completed",
extra={
"task": _current_task_name(),
"saved": saved,
"start_time": start_time,
"finish_time": now(),
},
)


def decode_line(line: str | bytes) -> list[str] | None:
Expand Down Expand Up @@ -314,5 +317,3 @@ def test_and_create_proxy(url: str) -> Proxy | None:
if location is None or location == "unknown":
return None
return Proxy(url=url)


154 changes: 153 additions & 1 deletion proxylist/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import base64
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch

from django.core.cache import cache
from django.test import TestCase
from requests.exceptions import ConnectionError, ReadTimeout, SSLError

from proxylist.models import Proxy
from proxylist.tasks import (
PROXY_UPDATE_FIELDS,
_check_connectivity,
_current_task_name,
_persist_proxy_updates,
_run_proxy_checks,
decode_line,
extract_sip002_url,
poll_subscriptions,
remove_low_quality_proxies,
save_proxies,
update_status,
)


Expand All @@ -32,6 +39,151 @@ def test_outermost_frame_wins_when_nested(self):
assert record.task == "poll_subscriptions"


class CheckConnectivityTest(TestCase):
@staticmethod
def _make_response(status_code):
resp = Mock()
resp.status_code = status_code
return resp

@patch("proxylist.tasks.requests.get", side_effect=SSLError())
def test_returns_false_on_ssl_error(self, _):
assert _check_connectivity() is False

@patch("proxylist.tasks.requests.get", side_effect=ConnectionError())
def test_returns_false_on_connection_error(self, _):
assert _check_connectivity() is False

@patch("proxylist.tasks.requests.get", side_effect=ReadTimeout())
def test_returns_false_on_read_timeout(self, _):
assert _check_connectivity() is False

@patch("proxylist.tasks.requests.get")
def test_returns_false_on_non_204_status(self, mock_get):
mock_get.return_value = self._make_response(200)
with self.assertLogs("django", level="ERROR") as cm:
result = _check_connectivity()
assert result is False
assert any("connection issues" in r.getMessage() for r in cm.records)

@patch("proxylist.tasks.requests.get")
def test_returns_true_on_204_status(self, mock_get):
mock_get.return_value = self._make_response(204)
assert _check_connectivity() is True

@patch("proxylist.tasks.requests.get", side_effect=SSLError())
def test_logs_error_on_exception(self, _):
with self.assertLogs("django", level="ERROR") as cm:
_check_connectivity()
assert any("connection issues" in r.getMessage() for r in cm.records)


class RunProxyChecksTest(TestCase):
@patch("proxylist.tasks.update_proxy_status")
def test_calls_update_for_each_proxy(self, mock_update):
proxies = [Mock(spec=Proxy), Mock(spec=Proxy), Mock(spec=Proxy)]
with self.assertLogs("django", level="INFO"):
_run_proxy_checks(proxies)
assert mock_update.call_count == 3
mock_update.assert_has_calls([call(p) for p in proxies], any_order=True)

@patch("proxylist.tasks.update_proxy_status")
def test_handles_empty_list(self, mock_update):
with self.assertLogs("django", level="INFO") as cm:
_run_proxy_checks([])
mock_update.assert_not_called()
assert any("Proxies statuses checked" in r.getMessage() for r in cm.records)

@patch("proxylist.tasks.update_proxy_status")
def test_logs_completion(self, _):
with self.assertLogs("django", level="INFO") as cm:
_run_proxy_checks([])
assert any("Proxies statuses checked" in r.getMessage() for r in cm.records)


class PersistProxyUpdatesTest(TestCase):
fixtures = ["proxies.json"]

@staticmethod
def test_returns_zero_for_empty_list():
assert _persist_proxy_updates([]) == 0

@staticmethod
def test_does_not_touch_db_for_empty_list():
before = Proxy.objects.count()
_persist_proxy_updates([])
assert Proxy.objects.count() == before

@staticmethod
def test_does_not_clear_cache_for_empty_list():
cache.set("sentinel", "value")
_persist_proxy_updates([])
assert cache.get("sentinel") == "value"

@staticmethod
def test_returns_count_of_saved_proxies():
proxies = list(Proxy.objects.all()[:2])
result = _persist_proxy_updates(proxies)
assert result == 2

@staticmethod
def test_bulk_updates_with_correct_fields():
proxies = list(Proxy.objects.all()[:1])
with patch.object(Proxy.objects.__class__, "bulk_update") as mock_bulk:
_persist_proxy_updates(proxies)
mock_bulk.assert_called_once_with(proxies, PROXY_UPDATE_FIELDS, batch_size=500)

@staticmethod
def test_clears_cache_after_update():
cache.set("sentinel", "value")
proxies = list(Proxy.objects.all()[:1])
_persist_proxy_updates(proxies)
assert cache.get("sentinel") is None


class UpdateStatusTest(TestCase):
fixtures = ["proxies.json"]

@patch("proxylist.tasks._check_connectivity", return_value=False)
def test_returns_early_when_not_connected(self, _):
with patch("proxylist.tasks.Proxy") as mock_proxy:
update_status()
mock_proxy.objects.all.assert_not_called()

@patch("proxylist.tasks._check_connectivity", return_value=False)
def test_logs_start_even_when_not_connected(self, _):
with self.assertLogs("django", level="INFO") as cm:
update_status()
assert any("Updating proxies status" in r.getMessage() for r in cm.records)

@patch("proxylist.tasks._persist_proxy_updates", return_value=3)
@patch("proxylist.tasks._run_proxy_checks")
@patch("proxylist.tasks._check_connectivity", return_value=True)
def test_runs_checks_and_persists_when_connected(self, _, mock_run, mock_persist):
with self.assertLogs("django", level="INFO"):
update_status()
mock_run.assert_called_once()
mock_persist.assert_called_once()

@patch("proxylist.tasks._persist_proxy_updates", return_value=5)
@patch("proxylist.tasks._run_proxy_checks")
@patch("proxylist.tasks._check_connectivity", return_value=True)
def test_logs_completion_with_saved_count(self, _, _run, _persist):
with self.assertLogs("django", level="INFO") as cm:
update_status()
record = next(r for r in cm.records if "Update completed" in r.getMessage())
assert record.saved == 5

@patch("proxylist.tasks._persist_proxy_updates", return_value=0)
@patch("proxylist.tasks._run_proxy_checks")
@patch("proxylist.tasks._check_connectivity", return_value=True)
def test_passes_all_proxies_to_run_checks(self, _, mock_run, _persist):
with self.assertLogs("django", level="INFO"):
update_status()
proxies_arg = mock_run.call_args[0][0]
assert len(proxies_arg) == Proxy.objects.count()


class RemovalTest(TestCase):
fixtures = ["proxies.json"]

Expand Down
Loading