Skip to content

Commit 1447888

Browse files
authored
Merge pull request #2 from bitmakerla/add-metrics-extension
Add metrics extension
2 parents 121a270 + 10927c9 commit 1447888

File tree

3 files changed

+206
-0
lines changed

3 files changed

+206
-0
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ requires-python = ">=3.9"
1414

1515
dependencies = [
1616
"click>=8.0",
17+
"scrapy>=2.11",
18+
"pydantic>=2.0",
1719
]
1820

1921
[project.scripts]

src/ps_helper/extensions/__init__.py

Whitespace-only changes.
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
import os
2+
import time
3+
import json
4+
import math
5+
import datetime
6+
from collections import defaultdict
7+
8+
from scrapy import signals
9+
from pydantic import ValidationError
10+
11+
12+
class MetricsExtension:
13+
def __init__(self, stats, schema=None, unique_field=None, max_buckets=30):
14+
"""
15+
Scrapy Metrics Extension.
16+
17+
Args:
18+
stats: Scrapy Stats Collector.
19+
schema (BaseModel, optional): Pydantic model to validate scraped items.
20+
unique_field (str, optional): Field name used to detect duplicates.
21+
"""
22+
self.stats = stats
23+
self.start_time = None
24+
self.http_status_counter = defaultdict(int)
25+
self.duplicate_items = set()
26+
27+
# For schema coverage (valid vs invalid items)
28+
self.valid_items = 0
29+
self.invalid_items = 0
30+
31+
# Field coverage tracking
32+
self.field_coverage = defaultdict(lambda: {"complete": 0, "empty": 0})
33+
34+
# Timeline
35+
self.timeline = defaultdict(int)
36+
self.max_buckets = max_buckets
37+
self.schema = schema
38+
self.unique_field = unique_field
39+
40+
@classmethod
41+
def from_crawler(cls, crawler):
42+
schema = getattr(crawler.spidercls, "schema", None)
43+
unique_field = getattr(crawler.spidercls, "unique_field", None)
44+
45+
max_buckets = crawler.settings.getint("METRICS_TIMELINE_BUCKETS", 30)
46+
47+
ext = cls(crawler.stats, schema=schema, unique_field=unique_field, max_buckets=max_buckets)
48+
49+
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
50+
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
51+
crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
52+
crawler.signals.connect(ext.response_received, signal=signals.response_received)
53+
crawler.signals.connect(ext.spider_error, signal=signals.spider_error)
54+
55+
return ext
56+
57+
def spider_opened(self, spider):
58+
self.start_time = time.time()
59+
self.stats.set_value("custom/items_scraped", 0)
60+
self.stats.set_value("custom/pages_processed", 0)
61+
self.stats.set_value("custom/items_duplicates", 0)
62+
self.stats.set_value("custom/errors", 0)
63+
64+
def response_received(self, response, request, spider):
65+
self.stats.inc_value("custom/pages_processed")
66+
self.http_status_counter[response.status] += 1
67+
68+
def item_scraped(self, item, spider):
69+
self.stats.inc_value("custom/items_scraped")
70+
71+
# Validate schema if provided
72+
if self.schema:
73+
try:
74+
self.schema(**item)
75+
self.valid_items += 1
76+
except ValidationError:
77+
self.invalid_items += 1
78+
79+
# Track field coverage
80+
for field, value in item.items():
81+
if value is None or value == "" or value == []:
82+
self.field_coverage[field]["empty"] += 1
83+
else:
84+
self.field_coverage[field]["complete"] += 1
85+
86+
# Temporal timeline: save timestamp in seconds
87+
elapsed_seconds = int(time.time() - self.start_time)
88+
self.timeline[elapsed_seconds] += 1
89+
90+
# Check duplicates if unique_field is defined
91+
if self.unique_field and self.unique_field in item:
92+
value = item[self.unique_field]
93+
if value in self.duplicate_items:
94+
self.stats.inc_value("custom/items_duplicates")
95+
else:
96+
self.duplicate_items.add(value)
97+
98+
def spider_error(self, failure, response, spider):
99+
self.stats.inc_value("custom/errors")
100+
101+
def spider_closed(self, spider, reason):
102+
elapsed = time.time() - self.start_time
103+
total_minutes = elapsed / 60
104+
105+
# Get size of the interval
106+
interval_size = max(1, math.ceil(total_minutes / self.max_buckets))
107+
108+
# Success rate
109+
total_responses = sum(self.http_status_counter.values())
110+
status_200 = self.http_status_counter.get(200, 0)
111+
success_rate = (status_200 / total_responses * 100) if total_responses > 0 else 0
112+
113+
# Group timeline
114+
aggregated = defaultdict(int)
115+
for sec, count in self.timeline.items():
116+
minute = sec // 60
117+
bucket_start = (minute // interval_size) * interval_size
118+
bucket_end = bucket_start + interval_size
119+
label = f"{bucket_start}-{bucket_end}m"
120+
aggregated[label] += count
121+
122+
timeline_sorted = [
123+
{"interval": k, "items": v}
124+
for k, v in sorted(
125+
aggregated.items(),
126+
key=lambda x: int(x[0].split("-")[0])
127+
)
128+
]
129+
130+
items = self.stats.get_value("custom/items_scraped", 0)
131+
pages = self.stats.get_value("custom/pages_processed", 0)
132+
133+
# Speed
134+
items_per_min = items / (elapsed / 60) if elapsed > 0 else 0
135+
pages_per_min = pages / (elapsed / 60) if elapsed > 0 else 0
136+
time_per_page = elapsed / pages if pages > 0 else 0
137+
138+
# Schema coverage
139+
total_checked = self.valid_items + self.invalid_items
140+
schema_coverage_percentage = (
141+
(self.valid_items / total_checked) * 100 if total_checked else 0
142+
)
143+
144+
# Timeouts and retries
145+
timeouts = self.stats.get_value(
146+
"downloader/exception_type_count/twisted.internet.error.TimeoutError", 0
147+
)
148+
retries_total = self.stats.get_value("retry/count", 0)
149+
150+
retry_reasons = {
151+
k.replace("retry/reason_count/", ""): v
152+
for k, v in self.stats.get_stats().items()
153+
if k.startswith("retry/reason_count/")
154+
}
155+
156+
# Memory and bytes
157+
peak_mem = self.stats.get_value("memusage/max", 0) # bytes
158+
total_bytes = self.stats.get_value("downloader/response_bytes", 0)
159+
160+
metrics = {
161+
"spider_name": spider.name,
162+
"reason": reason,
163+
"elapsed_time_seconds": round(elapsed, 2),
164+
"items_scraped": items,
165+
"pages_processed": pages,
166+
"items_per_minute": round(items_per_min, 2),
167+
"pages_per_minute": round(pages_per_min, 2),
168+
"time_per_page_seconds": round(time_per_page, 2),
169+
"success_rate": round(success_rate, 2),
170+
"schema_coverage": {
171+
"percentage": round(schema_coverage_percentage, 2),
172+
"valid": self.valid_items,
173+
"checked": total_checked,
174+
"fields": dict(self.field_coverage),
175+
},
176+
"http_errors": dict(self.http_status_counter),
177+
"duplicates": self.stats.get_value("custom/items_duplicates", 0),
178+
"timeouts": timeouts,
179+
"retries": {
180+
"total": retries_total,
181+
"by_reason": retry_reasons,
182+
},
183+
"resources": {
184+
"peak_memory_bytes": peak_mem,
185+
"downloaded_bytes": total_bytes,
186+
},
187+
"timeline": timeline_sorted,
188+
"timeline_interval_minutes": interval_size,
189+
}
190+
191+
# Save metrics in folder by date
192+
now = datetime.datetime.now()
193+
day_folder = now.strftime("%Y-%m-%d")
194+
filename = now.strftime("metrics-%Y-%m-%d_%H-%M-%S.json")
195+
196+
output_dir = os.path.join("metrics", day_folder)
197+
os.makedirs(output_dir, exist_ok=True)
198+
199+
file_path = os.path.join(output_dir, filename)
200+
with open(file_path, "w", encoding="utf-8") as f:
201+
json.dump(metrics, f, indent=2, ensure_ascii=False)
202+
203+
spider.logger.info(f"Saved metrics: {file_path}")
204+
print(json.dumps(metrics, indent=2, ensure_ascii=False))

0 commit comments

Comments
 (0)