Skip to content

Commit 92e71c8

Browse files
authored
feat: Add ability to configure Outbox processor to use batches or guaranteed ordering (#58)
* use parallelizable outbox using batch api * add back sequential mode as library level config * clean up tests, reset class level config in test helper, pull out KinesisFailedEvent struct * add back accidentally deleted event specs * update readme, bump version * update readme to acknowledge possible future optimizations to guaranteed_order mode * clarify readme * use 500 as default worker batch size * shorten default sleep, always sleep between batches to prevent excessive database polling
1 parent 066577f commit 92e71c8

19 files changed

+790
-183
lines changed

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
journaled (6.2.1)
4+
journaled (6.2.2)
55
activejob
66
activerecord
77
activesupport

README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ Journaling provides a number of different configuation options that can be set i
164164
Journaled.outbox_base_class_name = 'EventsRecord'
165165
```
166166
167+
#### `Journaled.outbox_processing_mode` (default: `:batch`)
168+
169+
**Only relevant when using `Journaled::Outbox::Adapter`.**
170+
171+
Controls how events are sent to Kinesis. Two modes are available:
172+
173+
- **`:batch`** (default) - Uses the Kinesis `put_records` batch API for high throughput. Events are sent in parallel batches, allowing multiple workers to run concurrently. Best for most use cases where strict ordering is not required.
174+
175+
- **`:guaranteed_order`** - Uses the Kinesis `put_record` single-event API to send events sequentially. Events are processed one at a time in order, stopping on the first transient failure to preserve ordering. Use this when you need strict ordering guarantees per partition key. Note: The current implementation requires single-threaded processing, but future optimizations may support batching and multi-threading by partition key.
176+
177+
Example:
178+
```ruby
179+
# For high throughput (default)
180+
Journaled.outbox_processing_mode = :batch
181+
182+
# For guaranteed ordering
183+
Journaled.outbox_processing_mode = :guaranteed_order
184+
```
185+
167186
#### ActiveJob `set` options
168187
169188
Both model-level directives accept additional options to be passed into ActiveJob's `set` method:
@@ -182,6 +201,8 @@ journal_attributes :email, enqueue_with: { priority: 20, queue: 'journaled' }
182201

183202
Journaled includes a built-in Outbox-style delivery adapter with horizontally scalable workers.
184203

204+
By default, the Outbox adapter uses the Kinesis `put_records` batch API for high-throughput event processing, allowing multiple workers to process events in parallel. If you require strict ordering guarantees per partition key, you can configure sequential processing mode (see configuration options below).
205+
185206
**Setup:**
186207

187208
This feature requires creating database tables and is completely optional. Existing users are unaffected.
@@ -207,6 +228,16 @@ Journaled.delivery_adapter = Journaled::Outbox::Adapter
207228
# Optional: Customize worker behavior (these are the defaults)
208229
Journaled.worker_batch_size = 500 # Max events per Kinesis batch (Kinesis API limit)
209230
Journaled.worker_poll_interval = 5 # Seconds between polls
231+
232+
# Optional: Configure processing mode (default: :batch)
233+
# - :batch - Uses Kinesis put_records batch API for high throughput (default)
234+
# Events are sent in parallel batches. Multiple workers can run concurrently.
235+
# - :guaranteed_order - Uses Kinesis put_record single-event API for sequential processing
236+
# Events are sent one at a time in order. Use this if you need
237+
# strict ordering guarantees per partition key. The current
238+
# implementation processes events single-threaded, though future
239+
# optimizations may support batching/multi-threading by partition key.
240+
Journaled.outbox_processing_mode = :batch
210241
```
211242

212243
**Note:** When using the Outbox adapter, you do **not** need to configure an ActiveJob queue adapter (skip step 1 of Installation). The Outbox adapter uses the `journaled_outbox_events` table for event storage and its own worker daemons for processing, making it independent of ActiveJob. Transactional batching still works seamlessly with the Outbox adapter.
@@ -217,6 +248,8 @@ Journaled.worker_poll_interval = 5 # Seconds between polls
217248
bundle exec rake journaled_worker:work
218249
```
219250

251+
**Note:** In `:batch` mode (the default), you can run multiple worker processes concurrently for horizontal scaling. In `:guaranteed_order` mode, the current implementation is optimized for running a single worker to maintain ordering guarantees.
252+
220253
4. **Monitoring:**
221254

222255
The system emits `ActiveSupport::Notifications` events:

app/models/journaled/outbox/event.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@ class Event < Journaled.outbox_base_class_name.constantize
2929

3030
# Fetch a batch of events for processing using SELECT FOR UPDATE
3131
#
32+
# In :guaranteed_order mode, uses blocking lock to ensure sequential processing.
33+
# In :batch mode, uses SKIP LOCKED to allow parallel workers.
34+
#
3235
# @return [Array<Journaled::Outbox::Event>] Events locked for processing
3336
def self.fetch_batch_for_update
34-
ready_to_process
35-
.limit(Journaled.worker_batch_size)
36-
.lock
37-
.to_a
37+
query = ready_to_process.limit(Journaled.worker_batch_size)
38+
39+
lock_clause = if Journaled.outbox_processing_mode == :guaranteed_order
40+
'FOR UPDATE'
41+
else
42+
'FOR UPDATE SKIP LOCKED'
43+
end
44+
45+
query.lock(lock_clause).to_a
3846
end
3947

4048
# Requeue a failed event for processing

gemfiles/rails_7_2.gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ..
33
specs:
4-
journaled (6.2.1)
4+
journaled (6.2.2)
55
activejob
66
activerecord
77
activesupport

gemfiles/rails_8_0.gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ..
33
specs:
4-
journaled (6.2.1)
4+
journaled (6.2.2)
55
activejob
66
activerecord
77
activesupport

lib/journaled.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
require 'journaled/delivery_adapters/active_job_adapter'
1313
require 'journaled/outbox/adapter'
1414
require 'journaled/kinesis_client_factory'
15+
require 'journaled/kinesis_failed_event'
1516
require 'journaled/kinesis_batch_sender'
17+
require 'journaled/kinesis_sequential_sender'
1618
require 'journaled/outbox/batch_processor'
1719
require 'journaled/outbox/metric_emitter'
1820
require 'journaled/outbox/worker'
@@ -31,8 +33,9 @@ module Journaled
3133
mattr_writer(:transactional_batching_enabled) { true }
3234

3335
# Worker configuration (for Outbox-style event processing)
34-
mattr_accessor(:worker_batch_size) { 1000 }
35-
mattr_accessor(:worker_poll_interval) { 1 } # seconds
36+
mattr_accessor(:worker_batch_size) { 500 }
37+
mattr_accessor(:worker_poll_interval) { 0.5 } # seconds
38+
mattr_accessor(:outbox_processing_mode) { :batch } # :batch or :guaranteed_order
3639

3740
def self.transactional_batching_enabled?
3841
Thread.current[:journaled_transactional_batching_enabled] || @@transactional_batching_enabled
Lines changed: 72 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,108 @@
11
# frozen_string_literal: true
22

33
module Journaled
4-
# Sends batches of events to Kinesis using the PutRecord single-event API
4+
# Sends batches of events to Kinesis using the PutRecords batch API
55
#
66
# This class handles:
7-
# - Sending events individually to support guaranteed ordering
7+
# - Sending events in batches to improve throughput
88
# - Handling failures on a per-event basis
99
# - Classifying errors as transient vs permanent
1010
#
1111
# Returns structured results for the caller to handle event state management.
1212
class KinesisBatchSender
13-
FailedEvent = Struct.new(:event, :error_code, :error_message, :transient, keyword_init: true) do
14-
def transient?
15-
transient
16-
end
17-
18-
def permanent?
19-
!transient
20-
end
21-
end
22-
23-
PERMANENT_ERROR_CLASSES = [
24-
Aws::Kinesis::Errors::ValidationException,
13+
# Per-record error codes that indicate permanent failures (bad event data)
14+
PERMANENT_ERROR_CODES = [
15+
'ValidationException',
2516
].freeze
2617

2718
# Send a batch of database events to Kinesis
2819
#
29-
# Sends events one at a time to guarantee ordering. Stops on first transient failure.
20+
# Uses put_records batch API. Groups events by stream and sends each group as a batch.
3021
#
3122
# @param events [Array<Journaled::Outbox::Event>] Events to send
3223
# @return [Hash] Result with:
3324
# - succeeded: Array of successfully sent events
34-
# - failed: Array of FailedEvent structs (only permanent failures)
25+
# - failed: Array of FailedEvent structs (both transient and permanent failures)
3526
def send_batch(events)
36-
result = { succeeded: [], failed: [] }
27+
# Group events by stream since put_records requires all records to go to the same stream
28+
events.group_by(&:stream_name).each_with_object({ succeeded: [], failed: [] }) do |(stream_name, stream_events), result|
29+
batch_result = send_stream_batch(stream_name, stream_events)
30+
result[:succeeded].concat(batch_result[:succeeded])
31+
result[:failed].concat(batch_result[:failed])
32+
end
33+
end
3734

38-
events.each do |event|
39-
event_result = send_event(event)
40-
if event_result.is_a?(FailedEvent)
41-
if event_result.transient?
42-
emit_transient_failure_metric
43-
break
44-
else
45-
result[:failed] << event_result
46-
end
47-
else
48-
result[:succeeded] << event_result
49-
end
35+
private
36+
37+
def send_stream_batch(stream_name, stream_events)
38+
records = build_records(stream_events)
39+
40+
begin
41+
response = kinesis_client.put_records(stream_name:, records:)
42+
process_response(response, stream_events)
43+
rescue Aws::Kinesis::Errors::ValidationException
44+
# Re-raise batch-level validation errors (configuration issues)
45+
# These indicate invalid stream name, batch too large, etc.
46+
# Not event data problems - requires manual intervention
47+
raise
48+
rescue StandardError => e
49+
# Handle transient errors (throttling, network issues, service unavailable)
50+
handle_transient_batch_error(e, stream_events)
5051
end
52+
end
5153

52-
result
54+
def build_records(stream_events)
55+
stream_events.map do |event|
56+
{
57+
data: event.event_data.merge(id: event.id).to_json,
58+
partition_key: event.partition_key,
59+
}
60+
end
5361
end
5462

55-
private
63+
def process_response(response, stream_events)
64+
succeeded = []
65+
failed = []
5666

57-
# Send a single event to Kinesis
58-
#
59-
# @param event [Journaled::Outbox::Event] Event to send
60-
# @return [Journaled::Outbox::Event, FailedEvent] The event on success, or FailedEvent on failure
61-
def send_event(event)
62-
# Merge the DB-generated ID into the event data before sending to Kinesis
63-
event_data_with_id = event.event_data.merge(id: event.id)
67+
response.records.each_with_index do |record_result, index|
68+
event = stream_events[index]
6469

65-
kinesis_client.put_record(
66-
stream_name: event.stream_name,
67-
data: event_data_with_id.to_json,
68-
partition_key: event.partition_key,
69-
)
70+
if record_result.error_code
71+
failed << create_failed_event(event, record_result)
72+
else
73+
succeeded << event
74+
end
75+
end
7076

71-
event
72-
rescue *PERMANENT_ERROR_CLASSES => e
73-
Rails.logger.error("Kinesis event send failed (permanent): #{e.class} - #{e.message}")
74-
FailedEvent.new(
75-
event:,
76-
error_code: e.class.to_s,
77-
error_message: e.message,
78-
transient: false,
79-
)
80-
rescue StandardError => e
81-
Rails.logger.error("Kinesis event send failed (transient): #{e.class} - #{e.message}")
82-
FailedEvent.new(
77+
{ succeeded:, failed: }
78+
end
79+
80+
def create_failed_event(event, record_result)
81+
Journaled::KinesisFailedEvent.new(
8382
event:,
84-
error_code: e.class.to_s,
85-
error_message: e.message,
86-
transient: true,
83+
error_code: record_result.error_code,
84+
error_message: record_result.error_message,
85+
transient: PERMANENT_ERROR_CODES.exclude?(record_result.error_code),
8786
)
8887
end
8988

90-
def kinesis_client
91-
@kinesis_client ||= KinesisClientFactory.build
89+
def handle_transient_batch_error(error, stream_events)
90+
Rails.logger.error("Kinesis batch send failed (transient): #{error.class} - #{error.message}")
91+
92+
failed = stream_events.map do |event|
93+
Journaled::KinesisFailedEvent.new(
94+
event:,
95+
error_code: error.class.to_s,
96+
error_message: error.message,
97+
transient: true,
98+
)
99+
end
100+
101+
{ succeeded: [], failed: }
92102
end
93103

94-
def emit_transient_failure_metric
95-
ActiveSupport::Notifications.instrument('journaled.kinesis_batch_sender.transient_failure')
104+
def kinesis_client
105+
@kinesis_client ||= KinesisClientFactory.build
96106
end
97107
end
98108
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
module Journaled
4+
# Represents a failed event from Kinesis send operations
5+
#
6+
# Used by both KinesisBatchSender and KinesisSequentialSender to represent
7+
# events that failed to send to Kinesis, along with error details and whether
8+
# the failure is transient (retriable) or permanent.
9+
KinesisFailedEvent = Struct.new(:event, :error_code, :error_message, :transient, keyword_init: true) do
10+
def transient?
11+
transient
12+
end
13+
14+
def permanent?
15+
!transient
16+
end
17+
end
18+
end

0 commit comments

Comments
 (0)