Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6415a61
extracted resource watching logic into a separate class; implemented …
vlerkin Nov 1, 2024
3064c3a
add kubernetes scheduler class that subscribes to the event observer …
vlerkin Nov 6, 2024
7bee3ac
finish the observer logic extraction; merge changes from main; add re…
vlerkin Nov 7, 2024
7c39fc8
change enable_joblogs method signature in docker
vlerkin Nov 7, 2024
ec75600
refactor api.py and launcher/k8s.py to keep api.py launcher agnostic
vlerkin Nov 11, 2024
7641ddb
add implementation for docker to check the number of running containe…
vlerkin Nov 12, 2024
fdce0c0
remove update from RBAC because we perform patching of the resource a…
vlerkin Nov 12, 2024
0b3d19a
add number of retry attempts and exponential backoff time to the reco…
vlerkin Nov 14, 2024
62cf9ed
create a helper finction _filter_jobs that accepts a filter function …
vlerkin Nov 20, 2024
8a499d8
change the signature of the filtering method to also accept a parse f…
vlerkin Nov 20, 2024
7471b07
add number of retry attempts and exponential backoff time to the reco…
vlerkin Nov 14, 2024
aca31a5
add logic to reset number of reconnect attempts and backoff time when…
vlerkin Nov 15, 2024
c4d4242
added a CONFIG.md file with detailed explanations about parameters us…
vlerkin Nov 19, 2024
27a310f
move section about config file from README.md to CONFIG.md; add a lin…
vlerkin Nov 19, 2024
340f242
make number of job limitation optional by separating k8s_scheduler in…
vlerkin Nov 22, 2024
bc9983e
implement the feature to limit running jobs to be optional for Docker…
vlerkin Nov 22, 2024
3032628
add exceptions to the KubernetesScheduler class, for the init method,…
vlerkin Nov 25, 2024
57630ef
add custom exceptions to the class for log handling; add exceptions t…
vlerkin Nov 25, 2024
d2210da
modify method that fetches nez job id to be unsuspended to add creati…
vlerkin Nov 27, 2024
4da7e29
make the proper test structure according to the standards; add tests …
vlerkin Nov 27, 2024
3fa4922
rebased to main
vlerkin Jan 28, 2025
7925d37
change the test test_listversions_ok to check the exact list
vlerkin Jan 29, 2025
0f89754
add integration tests for scheduler with two different configs
vlerkin Feb 25, 2025
eb7173f
Adapt integration testing setup
wvengen Mar 4, 2025
d0501d5
grouped unit tests under classes; added parametrization to avoide dou…
vlerkin Mar 11, 2025
a482e49
remove num of attempts from sample config; add empty line in the watc…
vlerkin Mar 28, 2025
f9427ec
modify path to mock scheduler
vlerkin Mar 28, 2025
371b2b6
edited config to two params, removed those params from the sample con…
vlerkin Apr 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ jobs:
# for each integration test file
for test in scrapyd_k8s/tests/integration/test_*.py; do
echo; echo "# $test"
# run scrapyd-k8s with test-specific configuration file
# run scrapyd-k8s with test-specific configuration file, run k8s patch if available
cfg=`echo "$test" | sed 's/\.py$/.conf/'`
kubectl create cm scrapyd-k8s-testcfg --from-file=scrapyd_k8s.test.conf="$cfg"
k8sconfig=`echo "$test" | sed 's/\.py$/\.k8s.sh/'`
[ -x "$k8sconfig" ] && "$k8sconfig" up
kubectl scale --replicas=1 deploy/scrapyd-k8s
# wait for scrapyd-k8s to become ready
kubectl wait --for=condition=Available deploy/scrapyd-k8s --timeout=60s
Expand All @@ -151,6 +153,7 @@ jobs:
kubectl scale --replicas=0 deploy/scrapyd-k8s
kubectl wait --for=delete pod -l app.kubernetes.io/name=scrapyd-k8s --timeout=90s
kubectl delete cm scrapyd-k8s-testcfg --wait
[ -x "$k8sconfig" ] && "$k8sconfig" down
done

test-k8s:
Expand Down
18 changes: 14 additions & 4 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,28 @@ choose to use them.
The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the
nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate
long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived
connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`,
connection to the Kubernetes API. To achieve this, two parameters were introduced:
`backoff_time` and `backoff_coefficient`.

#### What are these parameters about?

* `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails;
* `backoff_time`, `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a
connection with the Kubernetes API, preventing the API from becoming overloaded with requests.
The `backoff_time` increases exponentially and is calculated as `backoff_time *= self.backoff_coefficient`.

### max_proc

By default `max_proc` is not present in the config file but you can add it under the scrapyd section to limit the number
of jobs that run in parallel, while other scheduled job wait for their turn to run whenever currently running job(s)
complete the run, or are cancelled, or are failed. This feature is available for both Kubernetes and Docker.

For example, you have a cluster with 0 running jobs, you schedule 20 jobs and provide `max_proc = 5` in the scrapyd section.
Then 5 jobs start running immediately and 15 others are suspended. Whenever at least of the jobs finish running, the new
job is added to run. The order in which jobs were scheduled is preserved.

`max_proc` - a parameter you can set to limit the number of running jobs at a given moment

#### When do I need to change it in the config file?

Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network
requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.

requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.
4 changes: 2 additions & 2 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ data:
launcher = scrapyd_k8s.launcher.K8s

namespace = default

max_proc = 2

# This is an example spider that should work out of the box.
# Adapt the spider config to your use-case.
Expand Down Expand Up @@ -164,6 +162,7 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: scrapyd-k8s
# review scrapyd_k8s/test/integration/*.k8s.sh when modifying this in the repository
rules:
- apiGroups: [""]
resources: ["pods"]
Expand All @@ -176,6 +175,7 @@ rules:
verbs: ["get"]
- apiGroups: ["batch"]
resources: ["jobs"]
# add "patch" if you use scheduling, i.e. if you use max_proc
verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
3 changes: 0 additions & 3 deletions scrapyd_k8s.sample-k8s.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ namespace = default
# Optional pull secret, in case you have private spiders.
#pull_secret = ghcr-registry

# Maximum number of jobs running in parallel
max_proc = 10

# For each project, define a project section.
# This contains a repository that points to the remote container repository.
# An optional env_secret is the name of a secret with additional environment
Expand Down
1 change: 0 additions & 1 deletion scrapyd_k8s/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
import logging
import uuid
from flask import Flask, request, Response, jsonify
from flask_basicauth import BasicAuth
Expand Down
47 changes: 39 additions & 8 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,21 @@

logger = logging.getLogger(__name__)

# Custom Exceptions
class KubernetesJobLogHandlerError(Exception):
"""Base exception class for KubernetesJobLogHandler."""

class LogUploadError(KubernetesJobLogHandlerError):
"""Raised when uploading logs to object storage fails."""

class PodStreamingError(KubernetesJobLogHandlerError):
"""Raised when streaming logs from a pod fails."""

class KubernetesJobLogHandler:
"""
A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage.


This class:
- Observes Kubernetes pods for job-related events.
- Streams logs from running pods, storing them locally.
Expand Down Expand Up @@ -48,10 +59,9 @@ class KubernetesJobLogHandler:
Ensures a log file exists for a given job and returns its path.

stream_logs(job_name):
Streams logs from a Kubernetes pod corresponding to the given job name and writes them to a file.

Streams logs from a Kubernetes pod and writes them to a file.
handle_events(event):
Processes Kubernetes pod events to start log streaming or upload logs when pods complete.
Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs.
"""
# The value was chosen to provide a balance between memory usage and the number of I/O operations
DEFAULT_BLOCK_SIZE = 6144
Expand Down Expand Up @@ -168,6 +178,7 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz
logger.debug(f"Concatenated '{temp_file_path}' into '{main_file_path}' and deleted temporary file.")
except (IOError, OSError) as e:
logger.error(f"Failed to concatenate and delete files for job: {e}")
raise KubernetesJobLogHandlerError(f"Failed to concatenate and delete files: {e}") from e

def make_log_filename_for_job(self, job_id):
"""
Expand Down Expand Up @@ -213,6 +224,11 @@ def stream_logs(self, job_id, pod_name):
Returns
-------
None

Raises
------
PodStreamingError
If an I/O error occurs while streaming logs or processing them.
"""
log_lines_counter = 0
v1 = client.CoreV1Api()
Expand Down Expand Up @@ -251,16 +267,30 @@ def stream_logs(self, job_id, pod_name):
else:
os.remove(temp_file_path)
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_id}'.")
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_id}': {e}")
except (IOError, OSError) as e:
logger.error(f"I/O error while streaming logs for job '{job_id}': {e}")
raise PodStreamingError(f"I/O error while streaming logs for job '{job_id}': {e}") from e
except KubernetesJobLogHandlerError as e:
logger.error(f"Error processing logs for job '{job_id}': {e}")
raise PodStreamingError(f"Error processing logs for job '{job_id}': {e}") from e

def handle_events(self, event):
"""
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs.

Parameters
----------
event : dict
The event dictionary containing information about the pod event.

Returns
-------
None

Raises
------
KubernetesJobLogHandlerError
If an error occurs while handling pod events.
"""
try:

Expand Down Expand Up @@ -297,5 +327,6 @@ def handle_events(self, event):
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
except Exception as e:
logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}")
except KubernetesJobLogHandlerError as e:
logger.error(f"Handled error in handle_events: {e}")
raise e
4 changes: 0 additions & 4 deletions scrapyd_k8s/k8s_resource_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class ResourceWatcher:
subscribers : List[Callable]
List of subscriber callback functions to notify on events.
"""

def __init__(self, namespace, config):
"""
Initializes the ResourceWatcher.
Expand Down Expand Up @@ -51,7 +50,6 @@ def subscribe(self, callback: Callable):
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")

def unsubscribe(self, callback: Callable):
"""
Removes a subscriber callback.
Expand All @@ -65,7 +63,6 @@ def unsubscribe(self, callback: Callable):
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")

def notify_subscribers(self, event: dict):
"""
Notifies all subscribers about an event.
Expand Down Expand Up @@ -141,7 +138,6 @@ def watch_pods(self):
time.sleep(backoff_time)
backoff_time = min(backoff_time*self.backoff_coefficient, 900)


def stop(self):
"""
Stops the watcher thread gracefully.
Expand Down
94 changes: 92 additions & 2 deletions scrapyd_k8s/launcher/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import re
import socket

import threading
import time

import docker
from ..utils import format_iso_date_string, native_stringify_dict

Expand All @@ -22,6 +25,37 @@ class Docker:

def __init__(self, config):
self._docker = docker.from_env()
self.max_proc = config.scrapyd().get('max_proc')
if self.max_proc is not None:
self.max_proc = int(self.max_proc)
self._stop_event = threading.Event()
self._lock = threading.Lock()

self._thread = threading.Thread(target=self._background_task, daemon=True)
self._thread.start()
logger.info("Background thread for managing Docker containers started.")
else:
self._thread = None
logger.info("Job limit not set; Docker launcher will not limit running jobs.")

def _background_task(self):
"""
Background thread that periodically checks and starts pending containers.
"""
check_interval = 5 # seconds
while not self._stop_event.is_set():
with self._lock:
self.start_pending_containers()
time.sleep(check_interval)

def shutdown(self):
"""
Cleanly shutdown the background thread.
"""
if self._thread is not None:
self._stop_event.set()
self._thread.join()
logger.info("Background thread for managing Docker containers stopped.")

def get_node_name(self):
return socket.gethostname()
Expand All @@ -41,7 +75,7 @@ def schedule(self, project, version, spider, job_id, settings, args):
'SCRAPYD_JOB': job_id,
} # TODO env_source handling
resources = project.resources(spider)
c = self._docker.containers.run(
c = self._docker.containers.create(
image=project.repository() + ':' + version,
command=['scrapy', 'crawl', spider, *_args, *_settings],
environment=env,
Expand All @@ -55,6 +89,47 @@ def schedule(self, project, version, spider, job_id, settings, args):
mem_limit=resources.get('limits', {}).get('memory'),
cpu_quota=_str_to_micro(resources.get('limits', {}).get('cpu'))
)
if self.max_proc is not None:
running_jobs_count = self.get_running_jobs_count()
if running_jobs_count < self.max_proc:
self.start_pending_containers()
else:
logger.info(f"Job {job_id} is pending due to max_proc limit.")
else:
c.start()
logger.info(f"Job {job_id} started without suspension.")

def start_pending_containers(self):
"""
Checks if there is capacity to start pending containers and starts them if possible.
"""
running_jobs_count = self.get_running_jobs_count()
logger.debug(f"Current running jobs: {running_jobs_count}, max_proc: {self.max_proc}")

while running_jobs_count < self.max_proc:
pending_container = self.get_next_pending_container()
if not pending_container:
logger.info("No pending containers to start.")
break
try:
pending_container.start()
running_jobs_count += 1
logger.info(
f"Started pending container {pending_container.name}. Total running jobs now: {running_jobs_count}")
except Exception as e:
logger.error(f"Failed to start container {pending_container.name}: {e}")
break

def get_next_pending_container(self):
pending_containers = self._docker.containers.list(all=True, filters={
'label': self.LABEL_PROJECT,
'status': 'created',
})
if not pending_containers:
return None
# Sort by creation time to ensure FIFO order
pending_containers.sort(key=lambda c: c.attrs['Created'])
return pending_containers[0]

def cancel(self, project_id, job_id, signal):
c = self._get_container(project_id, job_id)
Expand All @@ -64,13 +139,28 @@ def cancel(self, project_id, job_id, signal):
prevstate = self._docker_to_scrapyd_status(c.status)
if c.status == 'created' or c.status == 'scheduled':
c.remove()
logger.info(f"Removed pending container {c.name}.")
elif c.status == 'running':
c.kill(signal='SIG' + signal)
logger.info(f"Killed and removed running container {c.name}.")
# After cancelling, try to start pending containers since we might have capacity
if self.max_proc is not None:
self.start_pending_containers()
return prevstate

def enable_joblogs(self, config):
def enable_joblogs(self, config, resource_watcher):
logger.warning("Job logs are not supported when using the Docker launcher.")

def get_running_jobs_count(self):
if self.max_proc is not None:
# Return the number of running Docker containers matching the job labels
label = self.LABEL_PROJECT
running_jobs = self._docker.containers.list(filters={'label': label, 'status': 'running'})
return len(running_jobs)
else:
# If job limiting is not enabled, return 0 to avoid unnecessary processing
return 0

def _parse_job(self, c):
state = self._docker_to_scrapyd_status(c.status)
return {
Expand Down
Loading
Loading