-
Notifications
You must be signed in to change notification settings - Fork 0
Task workflow
This document describes the complete lifecycle of a Treatment in the Django frontend application, from user interaction to API communication and result processing.
-
Document selection: User selects a
DocumentSet -
Treatment Form: User fills
TreatmentForm: definestask_type(regions,similarity, ...) and subform parameters (model,algorithm, ...) -
Treatment Creation: Form submission results in
Treatmentrecord creation -
Task Launch:
post_savesignal triggers celery task -
API Request:
treatment.start_task()sends a POST request to API - API processing: notifications to front
-
Processing Notifications:
receive_notificationview callstreatment.receive_notification() -
Result Processing: Specific task
process_results()triggers task-specific processing
Users select documents (Witnesses, Series, Digitizations, Works) which are stored in a DocumentSet.
# webapp/models/document_set.py
class DocumentSet(AbstractSearchableModel):
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True)
title = models.CharField(max_length=50)
# Document references stored as ID arrays
wit_ids = ArrayField(models.IntegerField(), default=list, blank=True, null=True)
ser_ids = ArrayField(models.IntegerField(), default=list, blank=True, null=True)
digit_ids = ArrayField(models.IntegerField(), default=list, blank=True, null=True)
work_ids = ArrayField(models.IntegerField(), default=list, blank=True, null=True)
def all_witnesses(self):
"""Returns all Witness objects, including those from Series and Works"""
return list(
Witness.objects.filter(id__in=self.all_witness_ids())
.select_related("series")
.prefetch_related("digitizations", "contents__work")
)The DocumentSet aggregates different document types and provides methods to retrieve all associated witnesses for processing.
The TreatmentForm handles task selection and dynamically loads task-specific subforms.
It handles:
-
validation: Ensures
document_setandtask_typeare provided, validates subforms -
data transformation: Converts subform data into api_parameters JSON (via
_populate_instance()) -
dynamic fields: Adds task-specific subform fields based on
task_type - prefilling: Pre-populates fields from GET parameters
- saving: the new Treatment object is saved to database.
# webapp/forms/treatment.py
class TreatmentForm(forms.ModelForm):
class Meta:
model = Treatment
fields = ["task_type", "document_set", "notify_email"]
def __init__(self, *args, **kwargs):
self._user = kwargs.pop("user", None)
# ...
# Dynamic subform loading based on installed modules
self.subforms = {}
form_mapping = {
"similarity": SimilarityForm,
"regions": RegionsForm,
"vectorization": VectorizationForm,
}
for task_name in ADDITIONAL_MODULES:
if task_name in form_mapping:
self.add_subform(task_name, form_mapping[task_name], ...)Each task module provides its own form class with specific parameters:
# regions/forms.py
class RegionsForm(forms.Form):
model = forms.ChoiceField(
label="Model",
choices=[("", "-")], # dynamically populated
initial="illustration_extraction",
)
def get_api_parameters(self):
"""Returns parameters to be sent to the API"""
return {"model": self.cleaned_data.get("model")}When the form is saved, api_parameters from the active subform are stored on the Treatment:
# TreatmentForm._populate_instance()
def _populate_instance(self, instance):
instance.requested_by = self._user
task_type = self.cleaned_data.get("task_type")
if task_type in self.subforms:
subform = self.subforms[task_type]
if subform.is_valid():
instance.api_parameters = subform.get_api_parameters()The view handles form submission and creates the Treatment record:
# webapp/views/admin.py
class TreatmentCreate(AbstractRecordCreate):
model = Treatment
form_class = TreatmentForm
def form_valid(self, form):
self.object = form.save(commit=False)
self.object.requested_by = self.request.user
self.object = form.save() # triggers post_save signal
return super().form_valid(form)A Django signal automatically triggers task execution when a Treatment is created:
# webapp/models/treatment.py
@receiver(post_save, sender=Treatment)
def treatment_post_save(sender, instance, created, **kwargs):
if created:
launch_task.delay(instance)The Celery task retrieves witnesses and starts the task:
# webapp/tasks.py
@celery_app.task
def launch_task(treatment):
try:
witnesses = treatment.get_witnesses()
treatment.start_task(witnesses)
except Exception as e:
treatment.on_task_error({
"error": f"Error when starting the task: {e}",
"notify": treatment.notify_email,
})Treatment.start_task() prepares and sends the request to the API:
# webapp/models/treatment.py
def start_task(self, witnesses):
# 1. Load task-specific prepare_request function dynamically
parameters = prepare_task_request(
self.task_type, # e.g., "regions"
witnesses,
self.id,
self.api_parameters # from subform: {"model": "illustration_extraction"}
)
# 2. Send request to API
url = f"{API_URL}/{self.task_type}/start"
api_query = requests.post(url, json=parameters)
# 3. Store tracking info
api_response = api_query.json()
self.api_tracking_id = api_response["tracking_id"]
self.status = "STARTED"
self.save()The prepare_task_request function dynamically imports the task module:
# webapp/utils/tasking.py
def prepare_task_request(task_name, records, treatment_id, api_parameters=None):
module = importlib.import_module(f"{task_name}.utils")
return getattr(module, "prepare_request")(records, treatment_id, api_parameters)Each task module implements its own prepare_request:
# regions/utils.py
def prepare_request(witnesses, treatment_id, parameters=None):
return tasking.prepare_request(
witnesses,
treatment_id,
prepare_document, # function to convert each doc to API format
"regions",
dict({"model": EXTRACTOR_MODEL}, **(parameters or {})), # aggregate form, default and additional parameters
)The generic tasking.prepare_request builds the final payload:
# webapp/utils/tasking.py
def prepare_request(records, treatment_id, prepare_document, task_name, parameters={}):
documents = []
for record in records:
documents.extend(prepare_document(record, **parameters))
return {
"experiment_id": str(treatment_id),
"documents": documents,
"notify_url": f"{APP_URL}/{APP_NAME}/{task_name}/notify",
**parameters,
}Example payload sent to API:
{
"experiment_id": "1234-abcd-5678-efgh",
"documents": [
{"type": "iiif", "src": "https://app-url/app/iiif/auto/wit3_man186/manifest.json", "uid": "wit3_man186"},
{"type": "url_list", "src": "https://app-url/app/urls_list", "uid": "wit3_man186_anno189"},
],
"notify_url": "https://app-url.com/app/regions/notify",
"model": "illustration_extraction"
}The API sends status updates to the notify_url. Each task module exposes a notification endpoint:
# regions/views.py
@csrf_exempt
def receive_regions_notification(request):
response, status_code = receive_notification(request)
return JsonResponse(response, status=status_code, safe=False)The generic receive_notification parses the request and routes to the Treatment:
# webapp/utils/tasking.py
def receive_notification(request):
data = json.loads(request.body.decode("utf-8"))
# data contains: experiment_id, event, tracking_id, output/error
treatment = Treatment.objects.get(id=data["experiment_id"])
treatment.receive_notification(event=data.get("event"), data=data)
return {"success": True, "message": "Update received"}, 200The Treatment handles different event types:
# webapp/models/treatment.py
def receive_notification(self, event, data):
if event == "STARTED":
self.status = "IN PROGRESS"
self.save()
elif event == "PROGRESS":
self.process_results(data, completed=False)
elif event == "SUCCESS":
self.process_results(data, completed=True)
elif event == "ERROR":
self.on_task_error({
"notify": self.notify_email,
"error": data.get("message"),
}, completed=data.get("completed", True))| Event | Trigger | Action |
|---|---|---|
STARTED |
API begins processing | status = "IN PROGRESS" |
PROGRESS |
Intermediate results ready, allow to process results as they arrive | process_results(completed=False) |
SUCCESS |
Task completed | process_results(completed=True) |
ERROR |
Task failed | on_task_error() |
PENDING ──┬──► STARTED ──► IN PROGRESS ──┬──► SUCCESS
│ │
└────────► ERROR ◄─────────────┘
(any failure point)
Key principle: Any exception path must call on_task_error() to:
- Update
Treatment.statusto"ERROR" - Optionally notify user via email
- Log the error for debugging
process_results dynamically loads the task-specific processing function:
# webapp/models/treatment.py
def process_results(self, data, completed=True):
try:
process_task_results(self.task_type, data, completed)
if completed:
self.on_task_success({"notify": self.notify_email})
except Exception as e:
self.on_task_error({"error": str(e), "notify": self.notify_email})
# webapp/utils/tasking.py
def process_task_results(task_name, data, completed=True):
module = importlib.import_module(f"{task_name}.utils")
return getattr(module, "process_results")(data, completed)Each task module implements process_results:
# regions/utils.py
def process_results(data, completed=True):
"""
data["output"] = {
"results_url": [{"doc_id": "...", "result_url": "..."}, ...],
"error": [...]
}
"""
output = data.get("output", None)
results_url = output.get("results_url", None)
for doc_results in results_url:
doc_id = doc_results.get("doc_id")
result_url = doc_results.get("result_url")
# Download and process the result file
response = requests.get(result_url, stream=True)
json_content = response.json()
# Delegate heavy processing to Celery
process_regions_file.delay(json_content, digit_id, model_name)The task_request() function in tasking.py allows relaunching a task without going through the form/Treatment creation flow:
# webapp/utils/tasking.py
def task_request(task_name, records, treatment_id=None, endpoint="start"):
"""Utility to send a task request directly, bypassing Treatment creation"""
if not treatment_id:
# Creates a new Treatment, which triggers post_save → normal flow
create_treatment(records, task_name)
return True
# Direct API call for existing treatment (relaunch scenario)
payload = prepare_task_request(task_name, records, treatment_id)
response = requests.post(url=f"{API_URL}/{task_name}/start", json=payload)
return response.status_code == 200Usage in task modules (e.g., regions/utils.py):
def regions_request(witnesses, treatment_id):
"""Relaunch extraction request if automatic process failed"""
tasking.task_request("regions", witnesses, treatment_id)| File | Purpose |
|---|---|
webapp/models/treatment.py |
Treatment model, start_task(), receive_notification()
|
webapp/models/document_set.py |
DocumentSet model, document aggregation |
webapp/forms/treatment.py |
TreatmentForm with dynamic subforms |
webapp/utils/tasking.py |
Generic request preparation, notification handling |
{task}/utils.py |
Task-specific prepare_request(), process_results()
|
{task}/forms.py |
Task-specific form with get_api_parameters()
|
{task}/views.py |
Notification endpoint |
-
Frontend
-
Backend