diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6bde8f6..2de8204 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -132,9 +132,11 @@ jobs: # for each integration test file for test in scrapyd_k8s/tests/integration/test_*.py; do echo; echo "# $test" - # run scrapyd-k8s with test-specific configuration file + # run scrapyd-k8s with test-specific configuration file, run k8s patch if available cfg=`echo "$test" | sed 's/\.py$/.conf/'` kubectl create cm scrapyd-k8s-testcfg --from-file=scrapyd_k8s.test.conf="$cfg" + k8sconfig=`echo "$test" | sed 's/\.py$/\.k8s.sh/'` + [ -x "$k8sconfig" ] && "$k8sconfig" up kubectl scale --replicas=1 deploy/scrapyd-k8s # wait for scrapyd-k8s to become ready kubectl wait --for=condition=Available deploy/scrapyd-k8s --timeout=60s @@ -151,6 +153,7 @@ jobs: kubectl scale --replicas=0 deploy/scrapyd-k8s kubectl wait --for=delete pod -l app.kubernetes.io/name=scrapyd-k8s --timeout=90s kubectl delete cm scrapyd-k8s-testcfg --wait + [ -x "$k8sconfig" ] && "$k8sconfig" down done test-k8s: diff --git a/CONFIG.md b/CONFIG.md index c1edc02..f285328 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -59,18 +59,28 @@ choose to use them. The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived -connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`, +connection to the Kubernetes API. To achieve this, two parameters were introduced: `backoff_time` and `backoff_coefficient`. #### What are these parameters about? -* `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails; * `backoff_time`, `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases exponentially and is calculated as `backoff_time *= self.backoff_coefficient`. +### max_proc + +By default `max_proc` is not present in the config file but you can add it under the scrapyd section to limit the number +of jobs that run in parallel, while other scheduled job wait for their turn to run whenever currently running job(s) +complete the run, or are cancelled, or are failed. This feature is available for both Kubernetes and Docker. + +For example, you have a cluster with 0 running jobs, you schedule 20 jobs and provide `max_proc = 5` in the scrapyd section. +Then 5 jobs start running immediately and 15 others are suspended. Whenever at least of the jobs finish running, the new +job is added to run. The order in which jobs were scheduled is preserved. + +`max_proc` - a parameter you can set to limit the number of running jobs at a given moment + #### When do I need to change it in the config file? Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network -requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup. - +requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup. \ No newline at end of file diff --git a/kubernetes.yaml b/kubernetes.yaml index e430b91..6a58a97 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -89,8 +89,6 @@ data: launcher = scrapyd_k8s.launcher.K8s namespace = default - - max_proc = 2 # This is an example spider that should work out of the box. # Adapt the spider config to your use-case. @@ -164,6 +162,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: scrapyd-k8s +# review scrapyd_k8s/test/integration/*.k8s.sh when modifying this in the repository rules: - apiGroups: [""] resources: ["pods"] @@ -176,6 +175,7 @@ rules: verbs: ["get"] - apiGroups: ["batch"] resources: ["jobs"] + # add "patch" if you use scheduling, i.e. if you use max_proc verbs: ["get", "list", "create", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index 9ab3536..44aa99e 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -19,9 +19,6 @@ namespace = default # Optional pull secret, in case you have private spiders. #pull_secret = ghcr-registry -# Maximum number of jobs running in parallel -max_proc = 10 - # For each project, define a project section. # This contains a repository that points to the remote container repository. # An optional env_secret is the name of a secret with additional environment diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index 98207ed..c763b6e 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import logging import uuid from flask import Flask, request, Response, jsonify from flask_basicauth import BasicAuth diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py index 61c4592..b9d8dca 100644 --- a/scrapyd_k8s/joblogs/log_handler_k8s.py +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -8,10 +8,21 @@ logger = logging.getLogger(__name__) +# Custom Exceptions +class KubernetesJobLogHandlerError(Exception): + """Base exception class for KubernetesJobLogHandler.""" + +class LogUploadError(KubernetesJobLogHandlerError): + """Raised when uploading logs to object storage fails.""" + +class PodStreamingError(KubernetesJobLogHandlerError): + """Raised when streaming logs from a pod fails.""" + class KubernetesJobLogHandler: """ A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage. + This class: - Observes Kubernetes pods for job-related events. - Streams logs from running pods, storing them locally. @@ -48,10 +59,9 @@ class KubernetesJobLogHandler: Ensures a log file exists for a given job and returns its path. stream_logs(job_name): - Streams logs from a Kubernetes pod corresponding to the given job name and writes them to a file. - + Streams logs from a Kubernetes pod and writes them to a file. handle_events(event): - Processes Kubernetes pod events to start log streaming or upload logs when pods complete. + Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs. """ # The value was chosen to provide a balance between memory usage and the number of I/O operations DEFAULT_BLOCK_SIZE = 6144 @@ -168,6 +178,7 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz logger.debug(f"Concatenated '{temp_file_path}' into '{main_file_path}' and deleted temporary file.") except (IOError, OSError) as e: logger.error(f"Failed to concatenate and delete files for job: {e}") + raise KubernetesJobLogHandlerError(f"Failed to concatenate and delete files: {e}") from e def make_log_filename_for_job(self, job_id): """ @@ -213,6 +224,11 @@ def stream_logs(self, job_id, pod_name): Returns ------- None + + Raises + ------ + PodStreamingError + If an I/O error occurs while streaming logs or processing them. """ log_lines_counter = 0 v1 = client.CoreV1Api() @@ -251,16 +267,30 @@ def stream_logs(self, job_id, pod_name): else: os.remove(temp_file_path) logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_id}'.") - except Exception as e: - logger.exception(f"Error streaming logs for job '{job_id}': {e}") + except (IOError, OSError) as e: + logger.error(f"I/O error while streaming logs for job '{job_id}': {e}") + raise PodStreamingError(f"I/O error while streaming logs for job '{job_id}': {e}") from e + except KubernetesJobLogHandlerError as e: + logger.error(f"Error processing logs for job '{job_id}': {e}") + raise PodStreamingError(f"Error processing logs for job '{job_id}': {e}") from e def handle_events(self, event): """ - Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. + Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs. + + Parameters + ---------- + event : dict + The event dictionary containing information about the pod event. Returns ------- None + + Raises + ------ + KubernetesJobLogHandlerError + If an error occurs while handling pod events. """ try: @@ -297,5 +327,6 @@ def handle_events(self, event): logger.info(f"Logfile not found for job '{job_id}'") else: logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") - except Exception as e: - logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") + except KubernetesJobLogHandlerError as e: + logger.error(f"Handled error in handle_events: {e}") + raise e diff --git a/scrapyd_k8s/k8s_resource_watcher.py b/scrapyd_k8s/k8s_resource_watcher.py index 7e9991e..ed44fd1 100644 --- a/scrapyd_k8s/k8s_resource_watcher.py +++ b/scrapyd_k8s/k8s_resource_watcher.py @@ -18,7 +18,6 @@ class ResourceWatcher: subscribers : List[Callable] List of subscriber callback functions to notify on events. """ - def __init__(self, namespace, config): """ Initializes the ResourceWatcher. @@ -51,7 +50,6 @@ def subscribe(self, callback: Callable): if callback not in self.subscribers: self.subscribers.append(callback) logger.debug(f"Subscriber {callback.__name__} added.") - def unsubscribe(self, callback: Callable): """ Removes a subscriber callback. @@ -65,7 +63,6 @@ def unsubscribe(self, callback: Callable): if callback in self.subscribers: self.subscribers.remove(callback) logger.debug(f"Subscriber {callback.__name__} removed.") - def notify_subscribers(self, event: dict): """ Notifies all subscribers about an event. @@ -141,7 +138,6 @@ def watch_pods(self): time.sleep(backoff_time) backoff_time = min(backoff_time*self.backoff_coefficient, 900) - def stop(self): """ Stops the watcher thread gracefully. diff --git a/scrapyd_k8s/launcher/docker.py b/scrapyd_k8s/launcher/docker.py index e6f919e..39e4f8c 100644 --- a/scrapyd_k8s/launcher/docker.py +++ b/scrapyd_k8s/launcher/docker.py @@ -2,6 +2,9 @@ import re import socket +import threading +import time + import docker from ..utils import format_iso_date_string, native_stringify_dict @@ -22,6 +25,37 @@ class Docker: def __init__(self, config): self._docker = docker.from_env() + self.max_proc = config.scrapyd().get('max_proc') + if self.max_proc is not None: + self.max_proc = int(self.max_proc) + self._stop_event = threading.Event() + self._lock = threading.Lock() + + self._thread = threading.Thread(target=self._background_task, daemon=True) + self._thread.start() + logger.info("Background thread for managing Docker containers started.") + else: + self._thread = None + logger.info("Job limit not set; Docker launcher will not limit running jobs.") + + def _background_task(self): + """ + Background thread that periodically checks and starts pending containers. + """ + check_interval = 5 # seconds + while not self._stop_event.is_set(): + with self._lock: + self.start_pending_containers() + time.sleep(check_interval) + + def shutdown(self): + """ + Cleanly shutdown the background thread. + """ + if self._thread is not None: + self._stop_event.set() + self._thread.join() + logger.info("Background thread for managing Docker containers stopped.") def get_node_name(self): return socket.gethostname() @@ -41,7 +75,7 @@ def schedule(self, project, version, spider, job_id, settings, args): 'SCRAPYD_JOB': job_id, } # TODO env_source handling resources = project.resources(spider) - c = self._docker.containers.run( + c = self._docker.containers.create( image=project.repository() + ':' + version, command=['scrapy', 'crawl', spider, *_args, *_settings], environment=env, @@ -55,6 +89,47 @@ def schedule(self, project, version, spider, job_id, settings, args): mem_limit=resources.get('limits', {}).get('memory'), cpu_quota=_str_to_micro(resources.get('limits', {}).get('cpu')) ) + if self.max_proc is not None: + running_jobs_count = self.get_running_jobs_count() + if running_jobs_count < self.max_proc: + self.start_pending_containers() + else: + logger.info(f"Job {job_id} is pending due to max_proc limit.") + else: + c.start() + logger.info(f"Job {job_id} started without suspension.") + + def start_pending_containers(self): + """ + Checks if there is capacity to start pending containers and starts them if possible. + """ + running_jobs_count = self.get_running_jobs_count() + logger.debug(f"Current running jobs: {running_jobs_count}, max_proc: {self.max_proc}") + + while running_jobs_count < self.max_proc: + pending_container = self.get_next_pending_container() + if not pending_container: + logger.info("No pending containers to start.") + break + try: + pending_container.start() + running_jobs_count += 1 + logger.info( + f"Started pending container {pending_container.name}. Total running jobs now: {running_jobs_count}") + except Exception as e: + logger.error(f"Failed to start container {pending_container.name}: {e}") + break + + def get_next_pending_container(self): + pending_containers = self._docker.containers.list(all=True, filters={ + 'label': self.LABEL_PROJECT, + 'status': 'created', + }) + if not pending_containers: + return None + # Sort by creation time to ensure FIFO order + pending_containers.sort(key=lambda c: c.attrs['Created']) + return pending_containers[0] def cancel(self, project_id, job_id, signal): c = self._get_container(project_id, job_id) @@ -64,13 +139,28 @@ def cancel(self, project_id, job_id, signal): prevstate = self._docker_to_scrapyd_status(c.status) if c.status == 'created' or c.status == 'scheduled': c.remove() + logger.info(f"Removed pending container {c.name}.") elif c.status == 'running': c.kill(signal='SIG' + signal) + logger.info(f"Killed and removed running container {c.name}.") + # After cancelling, try to start pending containers since we might have capacity + if self.max_proc is not None: + self.start_pending_containers() return prevstate - def enable_joblogs(self, config): + def enable_joblogs(self, config, resource_watcher): logger.warning("Job logs are not supported when using the Docker launcher.") + def get_running_jobs_count(self): + if self.max_proc is not None: + # Return the number of running Docker containers matching the job labels + label = self.LABEL_PROJECT + running_jobs = self._docker.containers.list(filters={'label': label, 'status': 'running'}) + return len(running_jobs) + else: + # If job limiting is not enabled, return 0 to avoid unnecessary processing + return 0 + def _parse_job(self, c): state = self._docker_to_scrapyd_status(c.status) return { diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index 00ed30a..b1cd9d9 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -1,15 +1,22 @@ import os -import logging import kubernetes import kubernetes.stream +import logging from signal import Signals +from ..utils import format_datetime_object + +logger = logging.getLogger(__name__) + +from kubernetes.client import ApiException +from scrapyd_k8s.launcher.k8s_scheduler import KubernetesScheduler from ..k8s_resource_watcher import ResourceWatcher -from ..utils import format_datetime_object, native_stringify_dict +from ..utils import native_stringify_dict from scrapyd_k8s.joblogs import KubernetesJobLogHandler logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) class K8s: @@ -19,6 +26,7 @@ class K8s: def __init__(self, config): self._namespace = config.scrapyd().get('namespace', 'default') + self.max_proc = config.scrapyd().get('max_proc') self._pull_secret = config.scrapyd().get('pull_secret') # TODO figure out where to put Kubernetes initialisation try: @@ -29,15 +37,19 @@ def __init__(self, config): self._k8s = kubernetes.client.CoreV1Api() self._k8s_batch = kubernetes.client.BatchV1Api() + self.k8s_scheduler = None self._init_resource_watcher(config) def _init_resource_watcher(self, config): self.resource_watcher = ResourceWatcher(self._namespace, config) - if config.joblogs() is not None: self.enable_joblogs(config) else: logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") + if self.max_proc is not None: + self.enable_k8s_scheduler(config) + else: + logger.debug("k8s scheduler not enabled; jobs run directly after scheduling.") def get_node_name(self): deployment = os.getenv('MY_DEPLOYMENT_NAME', 'default') @@ -45,12 +57,22 @@ def get_node_name(self): return ".".join([n for n in [namespace, deployment] if n]) def listjobs(self, project=None): - label = self.LABEL_PROJECT + ('=%s'%(project) if project else '') - jobs = self._k8s_batch.list_namespaced_job(namespace=self._namespace, label_selector=label) - jobs = [self._parse_job(j) for j in jobs.items] - return jobs + label_selector = self.LABEL_PROJECT + ('=%s' % project if project else '') + return self._filter_jobs( + label_selector=label_selector, + filter_func=None, # No additional filtering + parse_func=self._parse_job + ) def schedule(self, project, version, spider, job_id, settings, args): + if self.k8s_scheduler: + running_jobs = self.get_running_jobs_count() + start_suspended = running_jobs >= self.k8s_scheduler.max_proc + logger.debug( + f"Scheduling job {job_id} with start_suspended={start_suspended}. Running jobs: {running_jobs}, Max procs: {self.k8s_scheduler.max_proc}") + else: + start_suspended = False + logger.debug(f"Scheduling job {job_id} without suspension. Scheduler not enabled.") job_name = self._k8s_job_name(project.id(), job_id) _settings = [i for k, v in native_stringify_dict(settings, keys_only=False).items() for i in ['-s', f"{k}={v}"]] _args = [i for k, v in native_stringify_dict(args, keys_only=False).items() for i in ['-a', f"{k}={v}"]] @@ -98,7 +120,7 @@ def schedule(self, project, version, spider, job_id, settings, args): ) job_spec = kubernetes.client.V1JobSpec( template=pod_template, - # suspend=True, # TODO implement scheduler with suspend + suspend=start_suspended, completions=1, backoff_limit=0 # don't retry (TODO reconsider) ) @@ -144,6 +166,135 @@ def enable_joblogs(self, config): else: logger.warning("No storage provider configured; job logs will not be uploaded.") + def enable_k8s_scheduler(self, config): + try: + max_proc = int(self.max_proc) + self.k8s_scheduler = KubernetesScheduler(config, self, max_proc) + logger.debug(f"KubernetesLauncher initialized with max_proc={max_proc}.") + self.resource_watcher.subscribe(self.k8s_scheduler.handle_pod_event) + logger.info("K8s scheduler started.") + except ValueError: + logger.error(f"Invalid max_proc value: {self.max_proc}. Scheduler not enabled.") + self.k8s_scheduler = None + + def unsuspend_job(self, job_id: str): + if not self.k8s_scheduler: + logger.error("Scheduler is not enabled. Cannot unsuspend jobs.") + return False + job_name = self._get_job_name(job_id) + if not job_name: + logger.error(f"Cannot unsuspend job {job_id}: job name not found.") + return False + try: + self._k8s_batch.patch_namespaced_job( + name=job_name, + namespace=self._namespace, + body={'spec': {'suspend': False}} + ) + logger.info(f"Job {job_id} unsuspended.") + return True + except ApiException as e: + logger.exception(f"Error unsuspending job {job_id}: {e}") + return False + + def get_running_jobs_count(self) -> int: + """ + Returns the number of currently active (unsuspended, not completed, not failed) jobs. + """ + label_selector = f"{self.LABEL_JOB_ID}" + active_jobs = self._filter_jobs( + label_selector=label_selector, + filter_func=self._is_active_job + ) + logger.debug(f"Found {len(active_jobs)} active jobs.") + return len(active_jobs) + + def list_suspended_jobs(self, label_selector: str = ""): + """ + Retrieves a list of suspended jobs. + """ + suspended_jobs = self._filter_jobs( + label_selector=label_selector, + filter_func=self._is_suspended_job + ) + logger.debug(f"Found {len(suspended_jobs)} suspended jobs.") + return suspended_jobs + + def _filter_jobs(self, label_selector, filter_func=None, parse_func=None): + """ + Helper method to fetch jobs and optionally filter and parse them. + + Parameters + ---------- + label_selector : str + Kubernetes label selector to filter jobs. + filter_func : function, optional + A function that takes a job and returns True if the job matches the condition. + If None, all jobs are included. + parse_func : function, optional + A function that takes a job and returns a transformed job. + If None, the raw job object is returned. + + Returns + ------- + list + A list of Kubernetes Job objects that match the filter condition and are parsed if parse_func is provided. + """ + try: + jobs_response = self._k8s_batch.list_namespaced_job( + namespace=self._namespace, + label_selector=label_selector + ) + jobs = jobs_response.items + + if filter_func is not None: + jobs = [job for job in jobs if filter_func(job)] + + if parse_func is not None: + jobs = [parse_func(job) for job in jobs] + + return jobs + except ApiException as e: + logger.exception(f"API call failed while listing jobs: {e}") + return [] + + def _is_active_job(self, job): + """ + Determines if a job is active (running) based on your specific filtering criteria. + + Parameters + ---------- + job : V1Job + A Kubernetes Job object. + + Returns + ------- + bool + True if the job is active, False otherwise. + """ + job_name = job.metadata.name + is_suspended = job.spec.suspend + is_completed = job.status.completion_time is not None + has_failed = job.status.failed is not None and job.status.failed > 0 + logger.debug( + f"Job {job_name}: suspended={is_suspended}, completed={is_completed}, failed={has_failed}") + return not is_suspended and not is_completed and not has_failed + + def _is_suspended_job(self, job): + """ + Determines if a job is suspended. + + Parameters + ---------- + job : V1Job + A Kubernetes Job object. + + Returns + ------- + bool + True if the job is suspended, False otherwise. + """ + return job.spec.suspend def _parse_job(self, job): state = self._k8s_job_to_scrapyd_status(job) @@ -157,18 +308,45 @@ def _parse_job(self, job): } def _get_job(self, project, job_id): - label = self.LABEL_JOB_ID + '=' + job_id - r = self._k8s_batch.list_namespaced_job(namespace=self._namespace, label_selector=label) - if not r or not r.items: + label_selector = f"{self.LABEL_JOB_ID}={job_id}" + jobs = self._filter_jobs( + label_selector=label_selector, + filter_func=None + ) + if not jobs: + logger.error(f"No job found with job_id={job_id}") return None - job = r.items[0] - + job = jobs[0] if job.metadata.labels.get(self.LABEL_PROJECT) != project: - # TODO log error + logger.error(f"Job {job_id} does not belong to project {project}") return None - return job + def _get_job_name(self, job_id: str): + """ + Retrieves the Kubernetes job name for the given job ID. + + Parameters + ---------- + job_id : str + The job ID to look up. + + Returns + ------- + str or None + The name of the Kubernetes job, or None if not found. + """ + label_selector = f"{self.LABEL_JOB_ID}={job_id}" + jobs = self._filter_jobs( + label_selector=label_selector, + filter_func=None + ) + + if not jobs: + logger.error(f"No job found with job_id={job_id}") + return None + return jobs[0].metadata.name + def _get_pod(self, project, job_id): label = self.LABEL_JOB_ID + '=' + job_id r = self._k8s.list_namespaced_pod(namespace=self._namespace, label_selector=label) diff --git a/scrapyd_k8s/launcher/k8s_scheduler/__init__.py b/scrapyd_k8s/launcher/k8s_scheduler/__init__.py new file mode 100644 index 0000000..1444439 --- /dev/null +++ b/scrapyd_k8s/launcher/k8s_scheduler/__init__.py @@ -0,0 +1 @@ +from scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler import KubernetesScheduler \ No newline at end of file diff --git a/scrapyd_k8s/launcher/k8s_scheduler/k8s_scheduler.py b/scrapyd_k8s/launcher/k8s_scheduler/k8s_scheduler.py new file mode 100644 index 0000000..34e9f2e --- /dev/null +++ b/scrapyd_k8s/launcher/k8s_scheduler/k8s_scheduler.py @@ -0,0 +1,196 @@ +import logging +import datetime + +from kubernetes.client import ApiException + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +class KubernetesScheduler: + """ + Manages scheduling of Kubernetes jobs to limit the number of concurrently running jobs. + """ + + def __init__(self, config, launcher, max_proc): + """ + Initializes the KubernetesScheduler. + + Parameters + ---------- + config : Config + The configuration object. + launcher : K8s + The launcher instance with initialized Kubernetes clients. + max_proc : int + Maximum number of concurrent jobs. + + Raises + ------ + TypeError + If `max_proc` is not an integer. + """ + try: + self.config = config + self.launcher = launcher + if not isinstance(max_proc, int): + raise TypeError(f"max_proc must be an integer, got {type(max_proc).__name__}") + self.max_proc = max_proc + self.namespace = config.namespace() + logger.info("Scheduler feature is initialized") + except TypeError as e: + logger.exception(f"TypeError during KubernetesScheduler initialization: {e}") + raise + + def handle_pod_event(self, event): + """ + Handles pod events from the ResourceWatcher. + + Processes events related to pod lifecycle changes. If a pod associated with a job + has terminated (either succeeded or failed), it triggers the scheduler to check + and potentially unsuspend other suspended jobs to utilize available capacity. + + Parameters + ---------- + event : dict + A dictionary representing the pod event received from Kubernetes. + + Notes + ----- + This method handles and logs the following exceptions internally: + - KeyError: If the event dictionary lacks the 'object' or 'type' keys. + - AttributeError: If the pod object lacks attributes like `status.phase` or `metadata.name`. + - TypeError: If `event` is not a dictionary or does not have the expected structure. + + Exceptions are not propagated further. + """ + try: + if not isinstance(event, dict): + raise TypeError(f"Event must be a dictionary, got {type(event).__name__}") + pod = event['object'] + pod_phase = pod.status.phase + pod_name = pod.metadata.name + event_type = event['type'] + + if not hasattr(pod, 'status') or not hasattr(pod.status, 'phase'): + raise AttributeError("Pod object missing 'status.phase' attribute") + if not hasattr(pod, 'metadata') or not hasattr(pod.metadata, 'name'): + raise AttributeError("Pod object missing 'metadata.name' attribute") + + logger.debug(f"KubernetesScheduler received event: {event_type}, pod: {pod_name}, phase: {pod_phase}") + + # Check if this pod is related to our jobs + if not pod.metadata.labels.get(self.launcher.LABEL_JOB_ID): + logger.debug(f"Pod {pod_name} does not have our job label; ignoring.") + return + + # If a pod has terminated (Succeeded or Failed), we may have capacity to unsuspend jobs + if pod_phase in ('Succeeded', 'Failed'): + logger.info(f"Pod {pod_name} has completed with phase {pod_phase}. Checking for suspended jobs.") + self.check_and_unsuspend_jobs() + else: + logger.debug(f"Pod {pod_name} event not relevant for unsuspension.") + except KeyError as e: + logger.error(f"KeyError in handle_pod_event: Missing key {e} in event: {event}") + except AttributeError as e: + logger.error(f"AttributeError in handle_pod_event: {e} in event: {event}") + except TypeError as e: + logger.error(f"TypeError in handle_pod_event: {e} in event: {event}") + + def check_and_unsuspend_jobs(self): + """ + Checks if there is capacity to unsuspend jobs and unsuspends them if possible. + + Continuously unsuspends suspended jobs until the number of running jobs reaches + the maximum allowed (`max_proc`) or there are no more suspended jobs available. + + Notes + ----- + This method handles and logs the following exceptions internally: + - ApiException: If there are issues interacting with the Kubernetes API during job count retrieval + or while unsuspending a job. + - AttributeError: If the launcher object lacks required methods. + - TypeError: If `max_proc` is not an integer. + + Exceptions are not propagated further. + """ + try: + running_jobs_count = self.launcher.get_running_jobs_count() + logger.debug(f"Current running jobs: {running_jobs_count}, max_proc: {self.max_proc}") + + while running_jobs_count < self.max_proc: + job_id = self.get_next_suspended_job_id() + if not job_id: + logger.info("No suspended jobs to unsuspend.") + break + try: + success = self.launcher.unsuspend_job(job_id) + if success: + running_jobs_count += 1 + logger.info(f"Unsuspended job {job_id}. Total running jobs now: {running_jobs_count}") + else: + logger.error(f"Failed to unsuspend job {job_id}") + break + except ApiException as e: + logger.error(f"Kubernetes API exception while unsuspending job {job_id}: {e}") + break + except AttributeError as e: + logger.error(f"AttributeError while unsuspending job {job_id}: {e}") + break + except ApiException as e: + logger.error(f"Kubernetes API exception in check_and_unsuspend_jobs: {e}") + except AttributeError as e: + logger.error(f"AttributeError in check_and_unsuspend_jobs: {e}") + except TypeError as e: + logger.error(f"TypeError in check_and_unsuspend_jobs: {e}") + + def get_next_suspended_job_id(self): + """ + Retrieves the ID of the next suspended job from Kubernetes, + sorting by creation timestamp to ensure FIFO order. + + Returns + ------- + str or None + The job ID of the next suspended job, or None if none are found. + + Notes + ----- + This method handles and logs the following exceptions internally: + - ApiException: If there are issues interacting with the Kubernetes API during job retrieval. + - AttributeError: If job objects lack `metadata.creation_timestamp` or `metadata.labels`. + - TypeError: If the returned jobs list is not a list. + + Exceptions are not propagated further. + """ + try: + label_selector = f"{self.launcher.LABEL_JOB_ID}" + jobs = self.launcher.list_suspended_jobs(label_selector=label_selector) + + if not isinstance(jobs, list): + raise TypeError(f"list_suspended_jobs should return a list, got {type(jobs).__name__}") + + if not jobs: + logger.debug("No suspended jobs found.") + return None + + # Assign default timestamp to jobs missing creation_timestamp + for job in jobs: + if not hasattr(job, 'metadata') or not hasattr(job.metadata, 'creation_timestamp') or not job.metadata.creation_timestamp: + job.metadata.creation_timestamp = datetime.datetime.max + logger.warning( + f"Job {job} missing 'metadata.creation_timestamp'; assigned max timestamp.") + + # Sort jobs by creation timestamp to ensure FIFO order + jobs.sort(key=lambda job: job.metadata.creation_timestamp) + job = jobs[0] + if not hasattr(job.metadata, 'labels'): + raise AttributeError(f"Job object missing 'metadata.labels': {job}") + job_id = job.metadata.labels.get(self.launcher.LABEL_JOB_ID) + logger.debug(f"Next suspended job to unsuspend: {job_id}") + return job_id + except ApiException as api_e: + logger.error(f"Kubernetes API exception in get_next_suspended_job_id: {api_e}") + except AttributeError as attr_e: + logger.error(f"AttributeError in get_next_suspended_job_id: {attr_e}") + except TypeError as type_e: + logger.error(f"TypeError in get_next_suspended_job_id: {type_e}") diff --git a/scrapyd_k8s/tests/integration/test_maxproc_one.conf b/scrapyd_k8s/tests/integration/test_maxproc_one.conf new file mode 100644 index 0000000..ea56c53 --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_one.conf @@ -0,0 +1,3 @@ +# additional scrapyd-k8s configuration for test_maxproc_one.py +[scrapyd] +max_proc = 1 diff --git a/scrapyd_k8s/tests/integration/test_maxproc_one.k8s.sh b/scrapyd_k8s/tests/integration/test_maxproc_one.k8s.sh new file mode 100755 index 0000000..16f66b7 --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_one.k8s.sh @@ -0,0 +1,26 @@ +#!/bin/sh +# Kubernetes cluster preparation for test_maxproc_one.py +# Called with "up" argument on setup, with "down" argument afterwards. +# +# Adds "patch" to job role permissions. +# Tightly bound to kubernetes.yaml + +if [ "$1" = "up" ]; then + kubectl patch role scrapyd-k8s --type=json -p='[ + { + "op": "add", + "path": "/rules/3/verbs/0", + "value": "patch" + } + ]' +elif [ "$1" = "down" ]; then + kubectl patch role scrapyd-k8s --type=json -p='[ + { + "op": "remove", + "path": "/rules/3/verbs/0" + } + ]' +else + echo "Usage: $0 up|down" + exit 1 +fi diff --git a/scrapyd_k8s/tests/integration/test_maxproc_one.py b/scrapyd_k8s/tests/integration/test_maxproc_one.py new file mode 100644 index 0000000..7fa9e1c --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_one.py @@ -0,0 +1,75 @@ +import os +import requests +import time + +BASE_URL = os.getenv('TEST_BASE_URL', 'http://localhost:6800') +RUN_PROJECT = os.getenv('TEST_RUN_PROJECT', 'example') +RUN_VERSION = os.getenv('TEST_RUN_VERSION', 'latest') +RUN_SPIDER = os.getenv('TEST_RUN_SPIDER', 'static') +MAX_WAIT = int(os.getenv('TEST_MAX_WAIT', '6')) +STATIC_SLEEP = float(os.getenv('TEST_STATIC_SLEEP', '2')) + +def test_max_proc_one(): + """With max_proc=1, two jobs are scheduled, and are run after each other. """ + # Schedule a job + response = requests.post(BASE_URL + '/schedule.json', data={ + 'project': RUN_PROJECT, 'spider': RUN_SPIDER, '_version': RUN_VERSION, + 'setting': 'STATIC_SLEEP=%d' % STATIC_SLEEP + }) + assert_response_ok(response) + jobid1 = response.json()['jobid'] + assert jobid1 is not None + # Schedule another job right away, which remains queued + response = requests.post(BASE_URL + '/schedule.json', data={ + 'project': RUN_PROJECT, 'spider': RUN_SPIDER, '_version': RUN_VERSION, + 'setting': 'STATIC_SLEEP=%d' % STATIC_SLEEP + }) + assert_response_ok(response) + jobid2 = response.json()['jobid'] + assert jobid2 is not None + + # Wait and make sure the job remains in the pending state + listjobs_wait(jobid1, 'running') + assert_listjobs(pending=jobid2, running=jobid1) + # Wait until the first is finished and the second starts + listjobs_wait(jobid1, 'finished', max_wait=STATIC_SLEEP+MAX_WAIT) + listjobs_wait(jobid2, 'running') + # Wait until the second is finished too + listjobs_wait(jobid2, 'finished', max_wait=STATIC_SLEEP+MAX_WAIT) + +def assert_response_ok(response): + assert response.status_code == 200 + assert response.headers['Content-Type'] == 'application/json' + assert response.json()['status'] == 'ok' + +def assert_listjobs(pending=None, running=None, finished=None): + response = requests.get(BASE_URL + '/listjobs.json') + assert_response_ok(response) + if pending: + assert len(response.json()['pending']) == 1 + assert response.json()['pending'][0]['id'] == pending + return response.json()['pending'][0] + else: + assert response.json()['pending'] == [] + if running: + assert len(response.json()['running']) == 1 + assert response.json()['running'][0]['id'] == running + return response.json()['running'][0] + else: + assert response.json()['running'] == [] + # finished may contain other jobs + if finished: + matches = [j for j in response.json()['finished'] if j['id'] == finished] + assert len(matches) == 1 + return matches[0] + +def listjobs_wait(jobid, state, max_wait=MAX_WAIT): + started = time.monotonic() + while time.monotonic() - started < max_wait: + response = requests.get(BASE_URL + '/listjobs.json') + assert_response_ok(response) + for j in response.json()[state]: + if j['id'] == jobid: + return True + time.sleep(0.5) + assert False, 'Timeout waiting for job state change' diff --git a/scrapyd_k8s/tests/integration/test_maxproc_zero.conf b/scrapyd_k8s/tests/integration/test_maxproc_zero.conf new file mode 100644 index 0000000..4d0fd31 --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_zero.conf @@ -0,0 +1,3 @@ +# additional scrapyd-k8s configuration for test_maxproc_zero.py +[scrapyd] +max_proc = 0 diff --git a/scrapyd_k8s/tests/integration/test_maxproc_zero.k8s.sh b/scrapyd_k8s/tests/integration/test_maxproc_zero.k8s.sh new file mode 100755 index 0000000..4cd69ad --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_zero.k8s.sh @@ -0,0 +1,26 @@ +#!/bin/sh +# Kubernetes cluster preparation for test_maxproc_zero.py +# Called with "up" argument on setup, with "down" argument afterwards. +# +# Adds "patch" to job role permissions. +# Tightly bound to kubernetes.yaml + +if [ "$1" = "up" ]; then + kubectl patch role scrapyd-k8s --type=json -p='[ + { + "op": "add", + "path": "/rules/3/verbs/0", + "value": "patch" + } + ]' +elif [ "$1" = "down" ]; then + kubectl patch role scrapyd-k8s --type=json -p='[ + { + "op": "remove", + "path": "/rules/3/verbs/0" + } + ]' +else + echo "Usage: $0 up|down" + exit 1 +fi diff --git a/scrapyd_k8s/tests/integration/test_maxproc_zero.py b/scrapyd_k8s/tests/integration/test_maxproc_zero.py new file mode 100644 index 0000000..9976a90 --- /dev/null +++ b/scrapyd_k8s/tests/integration/test_maxproc_zero.py @@ -0,0 +1,50 @@ +import os +import requests +import time + +BASE_URL = os.getenv('TEST_BASE_URL', 'http://localhost:6800') +RUN_PROJECT = os.getenv('TEST_RUN_PROJECT', 'example') +RUN_VERSION = os.getenv('TEST_RUN_VERSION', 'latest') +RUN_SPIDER = os.getenv('TEST_RUN_SPIDER', 'static') +MAX_WAIT = int(os.getenv('TEST_MAX_WAIT', '6')) + +def test_max_proc_zero(): + """With max_proc=0, any scheduled job remains in the 'pending' state""" + # Schedule a job + response = requests.post(BASE_URL + '/schedule.json', data={ + 'project': RUN_PROJECT, 'spider': RUN_SPIDER, '_version': RUN_VERSION + }) + assert_response_ok(response) + jobid = response.json()['jobid'] + assert jobid is not None + + # Wait and make sure the job remains in the pending state + assert_listjobs(pending=jobid) + time.sleep(MAX_WAIT) + assert_listjobs(pending=jobid) + +def assert_response_ok(response): + assert response.status_code == 200 + assert response.headers['Content-Type'] == 'application/json' + assert response.json()['status'] == 'ok' + +def assert_listjobs(pending=None, running=None, finished=None): + response = requests.get(BASE_URL + '/listjobs.json') + assert_response_ok(response) + if pending: + assert len(response.json()['pending']) == 1 + assert response.json()['pending'][0]['id'] == pending + return response.json()['pending'][0] + else: + assert response.json()['pending'] == [] + if running: + assert len(response.json()['running']) == 1 + assert response.json()['running'][0]['id'] == running + return response.json()['running'][0] + else: + assert response.json()['running'] == [] + # finished may contain other jobs + if finished: + matches = [j for j in response.json()['finished'] if j['id'] == finished] + assert len(matches) == 1 + return matches[0] diff --git a/scrapyd_k8s/tests/unit/k8s_scheduler/__init__.py b/scrapyd_k8s/tests/unit/k8s_scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scrapyd_k8s/tests/unit/k8s_scheduler/test_k8s_scheduler.py b/scrapyd_k8s/tests/unit/k8s_scheduler/test_k8s_scheduler.py new file mode 100644 index 0000000..c29485c --- /dev/null +++ b/scrapyd_k8s/tests/unit/k8s_scheduler/test_k8s_scheduler.py @@ -0,0 +1,275 @@ +import pytest +from unittest.mock import Mock, patch +from kubernetes.client.rest import ApiException +from scrapyd_k8s.launcher.k8s_scheduler import KubernetesScheduler + +@pytest.fixture +def mock_config(): + mock_config = Mock() + mock_config.namespace.return_value = 'default' + return mock_config + +@pytest.fixture +def mock_launcher(): + mock_launcher = Mock() + mock_launcher.LABEL_JOB_ID = 'org.scrapy.job_id' + return mock_launcher + +class TestKubernetesSchedulerInitialization: + def test_k8s_scheduler_init(self, mock_config, mock_launcher): + max_proc = 5 + scheduler = KubernetesScheduler(mock_config, mock_launcher, max_proc) + assert scheduler.config == mock_config + assert scheduler.launcher == mock_launcher + assert scheduler.max_proc == max_proc + assert scheduler.namespace == 'default' + + def test_k8s_scheduler_init_invalid_max_proc(self, mock_config, mock_launcher): + max_proc = 'five' # Not an integer + with pytest.raises(TypeError) as excinfo: + KubernetesScheduler(mock_config, mock_launcher, max_proc) + assert "max_proc must be an integer" in str(excinfo.value) + + +class TestPodEventHandling: + @pytest.mark.parametrize("event, expected_log, log_type", [ + ( + ['not', 'a', 'dict'], + "TypeError in handle_pod_event: Event must be a dictionary, got list in event: ['not', 'a', 'dict']", + 'error' + ), + ( + {'wrong_key': 'value'}, + "KeyError in handle_pod_event: Missing key 'object' in event: {'wrong_key': 'value'}", + 'error' + ), + ]) + def test_handle_pod_event_input_validation(self, mock_config, mock_launcher, event, expected_log, log_type): + scheduler = KubernetesScheduler(mock_config, mock_launcher, 5) + with patch('scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler.logger') as mock_logger: + scheduler.handle_pod_event(event) + if log_type == 'error': + mock_logger.error.assert_called_with(expected_log) + elif log_type == 'debug': + mock_logger.debug.assert_called_with(expected_log) + elif log_type == 'info': + mock_logger.info.assert_called_with(expected_log) + + @pytest.mark.parametrize("pod_config, expected_log, log_type", [ + # Pod with missing status + ( + {"status": None, "metadata_name": "pod-name", "metadata_labels": {}}, + "AttributeError in handle_pod_event: 'NoneType' object has no attribute 'phase' in event: ", + 'error' + ), + # Pod with missing metadata + ( + {"status_phase": "Running", "metadata": None}, + "AttributeError in handle_pod_event: 'NoneType' object has no attribute 'name' in event: ", + 'error' + ), + # Pod not related to our jobs + ( + {"status_phase": "Running", "metadata_name": "pod-name", "metadata_labels": {}}, + "Pod pod-name does not have our job label; ignoring.", + 'debug' + ), + # Pod terminated successfully + ( + {"status_phase": "Succeeded", "metadata_name": "pod-name", + "metadata_labels": {"org.scrapy.job_id": "job-id"}}, + "Pod pod-name has completed with phase Succeeded. Checking for suspended jobs.", + 'info' + ), + # Pod not terminated + ( + {"status_phase": "Running", "metadata_name": "pod-name", + "metadata_labels": {"org.scrapy.job_id": "job-id"}}, + "Pod pod-name event not relevant for unsuspension.", + 'debug' + ), + ]) + def test_handle_pod_event_pod_scenarios(self, mock_config, mock_launcher, pod_config, expected_log, log_type): + scheduler = KubernetesScheduler(mock_config, mock_launcher, 5) + + # Create a mock pod based on the configuration + pod = Mock() + + if "status" in pod_config: + pod.status = pod_config["status"] + else: + pod.status = Mock() + if "status_phase" in pod_config: + pod.status.phase = pod_config["status_phase"] + + if "metadata" in pod_config: + pod.metadata = pod_config["metadata"] + else: + pod.metadata = Mock() + if "metadata_name" in pod_config: + pod.metadata.name = pod_config["metadata_name"] + if "metadata_labels" in pod_config: + pod.metadata.labels = pod_config["metadata_labels"] + + event = {'object': pod, 'type': 'MODIFIED'} + + with patch('scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler.logger') as mock_logger, \ + patch.object(scheduler, 'check_and_unsuspend_jobs') as mock_check_and_unsuspend_jobs: + scheduler.handle_pod_event(event) + + if log_type == 'error': + # For error logs, we need to check that it contains the expected text since + # the full event will be appended + assert mock_logger.error.call_count == 1 + assert expected_log in mock_logger.error.call_args[0][0] + elif log_type == 'debug': + mock_logger.debug.assert_called_with(expected_log) + elif log_type == 'info': + mock_logger.info.assert_called_with(expected_log) + + # Check if check_and_unsuspend_jobs should be called + if pod_config.get("status_phase") in ["Succeeded", "Failed"] and \ + pod_config.get("metadata_labels", {}).get(mock_launcher.LABEL_JOB_ID): + mock_check_and_unsuspend_jobs.assert_called_once() + else: + mock_check_and_unsuspend_jobs.assert_not_called() + + +class TestJobSuspensionManagement: + @pytest.mark.parametrize("running_count, suspended_jobs, unsuspend_results, expected_logs", [ + # Case with capacity and suspended jobs + ( + 3, # running_count + ['job1', 'job2', None], # suspended_jobs + [True, True], # unsuspend_results + [ + "Unsuspended job job1. Total running jobs now: 4", + "Unsuspended job job2. Total running jobs now: 5" + ] # expected_logs + ), + # Case with no suspended jobs + ( + 3, # running_count + [None], # suspended_jobs + [], # unsuspend_results + ["No suspended jobs to unsuspend."] # expected_logs + ), + # Case where unsuspension fails + ( + 3, # running_count + ['job1'], # suspended_jobs + [False], # unsuspend_results + ["Failed to unsuspend job job1"] # expected_logs + ) + ]) + def test_check_and_unsuspend_jobs_scenarios(self, mock_config, mock_launcher, + running_count, suspended_jobs, + unsuspend_results, expected_logs): + scheduler = KubernetesScheduler(mock_config, mock_launcher, 5) + + mock_launcher.get_running_jobs_count.return_value = running_count + scheduler.get_next_suspended_job_id = Mock(side_effect=suspended_jobs) + mock_launcher.unsuspend_job.side_effect = unsuspend_results + + with patch('scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler.logger') as mock_logger: + scheduler.check_and_unsuspend_jobs() + + assert mock_launcher.unsuspend_job.call_count == len(unsuspend_results) + + # Verify the logs + log_methods = { + "Failed to unsuspend job": mock_logger.error, + "No suspended jobs to unsuspend": mock_logger.info, + "Unsuspended job": mock_logger.info + } + + for expected_log in expected_logs: + for prefix, log_method in log_methods.items(): + if expected_log.startswith(prefix): + assert any( + expected_log in call_args[0][0] + for call_args in log_method.call_args_list + ) + + def test_check_and_unsuspend_jobs_unsuspend_api_exception(self, mock_config, mock_launcher): + scheduler = KubernetesScheduler(mock_config, mock_launcher, 5) + + mock_launcher.get_running_jobs_count.return_value = 3 + scheduler.get_next_suspended_job_id = Mock(return_value='job1') + mock_launcher.unsuspend_job.side_effect = ApiException("API Error") + + with patch('scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler.logger') as mock_logger: + scheduler.check_and_unsuspend_jobs() + mock_launcher.unsuspend_job.assert_called_once_with('job1') + mock_logger.error.assert_called_with( + f"Kubernetes API exception while unsuspending job job1: (API Error)\nReason: None\n" + ) + + +class TestSuspendedJobSelection: + @pytest.mark.parametrize("suspended_jobs, expected_job_id, expected_log, log_type", [ + # Case with suspended jobs - should return the oldest job (job1) + ( + [ + # job2 is newer + {'id': 'job2', 'creation_timestamp': '2021-01-02T00:00:00Z'}, + # job1 is older + {'id': 'job1', 'creation_timestamp': '2021-01-01T00:00:00Z'} + ], + 'job1', + "Next suspended job to unsuspend: job1", + 'debug' + ), + # Case with no suspended jobs + ( + [], + None, + "No suspended jobs found.", + 'debug' + ), + # Case with non-list return value + ( + 'not a list', + None, + "TypeError in get_next_suspended_job_id: list_suspended_jobs should return a list, got str", + 'error' + ), + # Case with job missing creation timestamp + ( + [{'id': 'job1', 'creation_timestamp': None}], + 'job1', + "Job .* missing 'metadata.creation_timestamp'; assigned max timestamp.", + 'warning' + ), + ]) + def test_get_next_suspended_job_id_scenarios(self, mock_config, mock_launcher, + suspended_jobs, expected_job_id, + expected_log, log_type): + scheduler = KubernetesScheduler(mock_config, mock_launcher, 5) + + # Handle special case for non-list + if suspended_jobs == 'not a list': + mock_launcher.list_suspended_jobs.return_value = suspended_jobs + else: + # Create proper mock jobs based on the test data + mock_jobs = [] + for job_data in suspended_jobs: + job = Mock() + job.metadata = Mock() + job.metadata.creation_timestamp = job_data['creation_timestamp'] + job.metadata.labels = {mock_launcher.LABEL_JOB_ID: job_data['id']} + mock_jobs.append(job) + + mock_launcher.list_suspended_jobs.return_value = mock_jobs + + with patch('scrapyd_k8s.launcher.k8s_scheduler.k8s_scheduler.logger') as mock_logger: + job_id = scheduler.get_next_suspended_job_id() + assert job_id == expected_job_id + + if log_type == 'debug': + mock_logger.debug.assert_called_with(expected_log) + elif log_type == 'error': + mock_logger.error.assert_called_with(expected_log) + elif log_type == 'warning': + import re + assert any(re.match(expected_log, args[0]) for args, _ in mock_logger.warning.call_args_list) diff --git a/scrapyd_k8s/tests/unit/launcher/__init__.py b/scrapyd_k8s/tests/unit/launcher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scrapyd_k8s/tests/unit/launcher/test_docker.py b/scrapyd_k8s/tests/unit/launcher/test_docker.py new file mode 100644 index 0000000..eeec2ba --- /dev/null +++ b/scrapyd_k8s/tests/unit/launcher/test_docker.py @@ -0,0 +1,182 @@ +import pytest +from unittest.mock import MagicMock, patch +import time +from scrapyd_k8s.launcher.docker import Docker + +@pytest.fixture +def config_with_max_proc(): + config = MagicMock() + config.scrapyd.return_value.get.return_value = '2' # max_proc set to 2 + return config + +@pytest.fixture +def config_without_max_proc(): + config = MagicMock() + config.scrapyd.return_value.get.return_value = None # max_proc not set + return config + +@pytest.fixture +def mock_docker_client(): + with patch('scrapyd_k8s.launcher.docker.docker') as mock_docker_module: + mock_client = MagicMock() + mock_docker_module.from_env.return_value = mock_client + yield mock_client + +@pytest.fixture +def docker_launcher_with_max_proc(config_with_max_proc, mock_docker_client): + return Docker(config_with_max_proc) + +@pytest.fixture +def docker_launcher_without_max_proc(config_without_max_proc, mock_docker_client): + return Docker(config_without_max_proc) + +def test_docker_init_with_max_proc(config_with_max_proc, mock_docker_client): + docker_launcher = Docker(config_with_max_proc) + assert docker_launcher.max_proc == 2 + assert docker_launcher._thread is not None + assert docker_launcher._thread.is_alive() + +def test_docker_init_without_max_proc(config_without_max_proc, mock_docker_client): + docker_launcher = Docker(config_without_max_proc) + assert docker_launcher.max_proc is None + assert docker_launcher._thread is None + +def test_schedule_with_capacity(docker_launcher_with_max_proc, mock_docker_client): + # Mock methods + docker_launcher_with_max_proc.get_running_jobs_count = MagicMock(return_value=1) + docker_launcher_with_max_proc.start_pending_containers = MagicMock() + + # Mock container creation + mock_container = MagicMock() + mock_docker_client.containers.create.return_value = mock_container + + # Prepare parameters for schedule + project = MagicMock() + project.id.return_value = 'test_project' + project.repository.return_value = 'test_repo' + project.resources.return_value = {} + version = 'v1' + spider = 'test_spider' + job_id = 'job_123' + settings = {} + args = {} + + # Call schedule + docker_launcher_with_max_proc.schedule(project, version, spider, job_id, settings, args) + + # Verify that container is created + mock_docker_client.containers.create.assert_called_once() + + # Since running jobs count is less than max_proc, start_pending_containers should be called + docker_launcher_with_max_proc.start_pending_containers.assert_called_once() + +def test_schedule_no_capacity(docker_launcher_with_max_proc, mock_docker_client): + # Mock methods + docker_launcher_with_max_proc.get_running_jobs_count = MagicMock(return_value=2) # At max_proc + docker_launcher_with_max_proc.start_pending_containers = MagicMock() + + # Mock container creation + mock_container = MagicMock() + mock_docker_client.containers.create.return_value = mock_container + + # Prepare parameters for schedule + project = MagicMock() + project.id.return_value = 'test_project' + project.repository.return_value = 'test_repo' + project.resources.return_value = {} + version = 'v1' + spider = 'test_spider' + job_id = 'job_456' + settings = {} + args = {} + + # Patch the logger to check log outputs + with patch('scrapyd_k8s.launcher.docker.logger') as mock_logger: + # Call schedule + docker_launcher_with_max_proc.schedule(project, version, spider, job_id, settings, args) + + # Verify that container is created + mock_docker_client.containers.create.assert_called_once() + + # start_pending_containers should not be called since we're at capacity + docker_launcher_with_max_proc.start_pending_containers.assert_not_called() + + # Verify that container.start() is not called immediately + mock_container.start.assert_not_called() + + # Check that the correct log message was output + mock_logger.info.assert_called_with(f"Job {job_id} is pending due to max_proc limit.") + +def test_schedule_no_max_proc(docker_launcher_without_max_proc, mock_docker_client): + # Mock container creation + mock_container = MagicMock() + mock_docker_client.containers.create.return_value = mock_container + + # Prepare parameters for schedule + project = MagicMock() + project.id.return_value = 'test_project' + project.repository.return_value = 'test_repo' + project.resources.return_value = {} + version = 'v1' + spider = 'test_spider' + job_id = 'job_789' + settings = {} + args = {} + + # Call schedule + docker_launcher_without_max_proc.schedule(project, version, spider, job_id, settings, args) + + # Verify that container is created + mock_docker_client.containers.create.assert_called_once() + + # Since max_proc is not set, container.start() should be called immediately + mock_container.start.assert_called_once() + +def test_get_running_jobs_count(docker_launcher_with_max_proc, mock_docker_client): + # Mock the list of running containers + mock_container_list = [MagicMock(), MagicMock()] + mock_docker_client.containers.list.return_value = mock_container_list + + count = docker_launcher_with_max_proc.get_running_jobs_count() + + # Verify that the count matches the number of mock containers + assert count == 2 + mock_docker_client.containers.list.assert_called_with( + filters={'label': docker_launcher_with_max_proc.LABEL_PROJECT, 'status': 'running'}) + +def test_start_pending_containers(docker_launcher_with_max_proc, mock_docker_client): + # Mock the get_running_jobs_count method + docker_launcher_with_max_proc.get_running_jobs_count = MagicMock(return_value=1) + + # Mock pending containers + mock_pending_container = MagicMock() + mock_pending_container.name = 'pending_container' + mock_docker_client.containers.list.return_value = [mock_pending_container] + + # Patch logger to check log outputs + with patch('scrapyd_k8s.launcher.docker.logger') as mock_logger: + # Call start_pending_containers + docker_launcher_with_max_proc.start_pending_containers() + + # Verify that the pending container's start method was called + mock_pending_container.start.assert_called_once() + + # Verify that the correct log message was output + mock_logger.info.assert_called_with( + f"Started pending container {mock_pending_container.name}. Total running jobs now: 2" + ) + +def test_background_task_starts_pending_containers(config_with_max_proc, mock_docker_client): + # Mock start_pending_containers before initializing the Docker class + with patch.object(Docker, 'start_pending_containers', autospec=True) as mock_start_pending: + # Initialize Docker instance + docker_launcher = Docker(config_with_max_proc) + + # Wait for slightly more than check_interval to ensure the background task runs + time.sleep(5.1) # Wait for the background thread to execute + + # Verify that start_pending_containers was called by the background thread + assert mock_start_pending.call_count > 0 + + # Clean up by shutting down the background thread + docker_launcher.shutdown()