Skip to content

Commit b54ea32

Browse files
committed
http collector: Add Chunking parameters
To handle big files in the queue, file splitting is necessary chunking was only available for the file and mail url collector, this adds it to the http collector
1 parent a89339d commit b54ea32

File tree

6 files changed

+43
-6
lines changed

6 files changed

+43
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o
2323

2424
### Bots
2525
#### Collectors
26+
- `intelmq.bots.collectors.http.collector_http`: Add Chunking parameters to handle big files (PR#2684 by Sebastian Wagner).
2627

2728
#### Parsers
2829

docs/user/bots.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,14 @@ This requires the [python-gnupg](https://pypi.org/project/python-gnupg/) library
321321
(optional, string) If specified, the string represents path to keyring file. Otherwise the PGP keyring file of the
322322
current `intelmq` user is used.
323323

324+
**Chunking**
325+
326+
For line-based inputs the bot can split up large reports into smaller chunks. This is particularly important for setups
327+
that use Redis as a message queue which has a per-message size limitation of 512 MB. To configure chunking,
328+
set `chunk_size` to a value in bytes. `chunk_replicate_header` determines whether the header line should be repeated for
329+
each chunk that is passed on to a parser bot. Specifically, to configure a large file input to work around Redis size
330+
limitation set `chunk_size` to something like 384000000 (~384 MB).
331+
324332
---
325333

326334
### Generic URL Stream Fetcher <div id="intelmq.bots.collectors.http.collector_http_stream" />

intelmq/bots/collectors/http/collector_http.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
gpg_keyring: none (defaults to user's GPG keyring) or string (path to keyring file)
2828
"""
2929
from datetime import datetime, timedelta
30+
from typing import Optional
31+
from io import BytesIO
3032

3133
from intelmq.lib.bot import CollectorBot
3234
from intelmq.lib.mixins import HttpMixin
3335
from intelmq.lib.utils import unzip
3436
from intelmq.lib.exceptions import MissingDependencyError
37+
from intelmq.lib.splitreports import generate_reports
3538

3639
try:
3740
import gnupg
@@ -64,6 +67,9 @@ class HTTPCollectorBot(CollectorBot, HttpMixin):
6467
signature_url_formatting: bool = False
6568
ssl_client_certificate: str = None # TODO: pathlib.Path
6669
verify_pgp_signatures: bool = False
70+
# splitreports
71+
chunk_replicate_header: bool = True
72+
chunk_size: Optional[int] = None
6773

6874
def init(self):
6975
self.use_gpg = self.verify_pgp_signatures
@@ -119,7 +125,7 @@ def process(self):
119125
try_tar=False, logger=self.logger,
120126
return_names=True))
121127
except ValueError:
122-
raw_reports.append((None, resp.text))
128+
raw_reports.append((None, resp.content))
123129
else:
124130
self.logger.info('Extracting files: '
125131
"'%s'.", "', '".join([file_name
@@ -130,12 +136,14 @@ def process(self):
130136
return_names=True, logger=self.logger)
131137

132138
for file_name, raw_report in raw_reports:
133-
report = self.new_report()
134-
report.add("raw", raw_report)
135-
report.add("feed.url", http_url)
139+
template = self.new_report()
140+
template.add("feed.url", http_url)
136141
if file_name:
137-
report.add("extra.file_name", file_name)
138-
self.send_message(report)
142+
template.add("extra.file_name", file_name)
143+
for report in generate_reports(template, BytesIO(raw_report),
144+
self.chunk_size,
145+
self.chunk_replicate_header):
146+
self.send_message(report)
139147

140148
def format_url(self, url: str, formatting) -> str:
141149
try:

intelmq/tests/assets/multiline.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed eiusmod tempor incidunt ut labore et dolore magna aliqua.
2+
Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquid ex ea commodi consequat.
3+
Quis aute iure reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
4+
Excepteur sint obcaecat cupiditat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
SPDX-FileCopyrightText: 2026 Institute for Common Good Technology
2+
SPDX-License-Identifier: AGPL-3.0-or-later

intelmq/tests/bots/collectors/http/test_collector.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,20 @@ def test_debug_request_response_log(self, mocker):
195195
self.assertLogMatches("Response headers: {'some': 'header'}.", 'DEBUG')
196196
self.assertLogMatches("Response body: 'Should be in logs'.", 'DEBUG')
197197

198+
def test_chunking(self, mocker):
199+
"""
200+
Test file chunking
201+
"""
202+
prepare_mocker(mocker)
203+
self.run_bot(allowed_error_count=1,
204+
parameters={
205+
'http_url': 'http://localhost/multiline.txt',
206+
'chunk_size': 3,
207+
'chunk_replicate_header': False,
208+
'extract_files': None,
209+
})
210+
self.assertOutputQueueLen(4)
211+
198212

199213
@requests_mock.Mocker()
200214
class TestHTTPCollectorBotAuthentication(test.BotTestCase, unittest.TestCase):

0 commit comments

Comments
 (0)