Skip to content

Commit dca11d4

Browse files
authored
[connector/routing] Add ability to route log records based on OTTL log context (#35939)
This PR resolves #35948 by adding a `context` field to routing table items. The default value of `context` is `resource`, so that existing users will not see a difference. `log` context may also be used. Each routing table item may have a difference context. `match_once` must be `true` in order to use `log` context.
1 parent 2547e1b commit dca11d4

File tree

47 files changed

+3289
-50
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3289
-50
lines changed

.chloggen/split-log-records.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: connector/routing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add ability to route log records individually using OTTL log record context.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [19738]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/routingconnector/README.md

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,17 @@ If you are not already familiar with connectors, you may find it helpful to firs
3333
The following settings are available:
3434

3535
- `table (required)`: the routing table for this connector.
36+
- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource` and `log` are supported.
3637
- `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided.
3738
- `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided.
3839
- `table.pipelines (required)`: the list of pipelines to use when the routing condition is met.
3940
- `default_pipelines (optional)`: contains the list of pipelines to use when a record does not meet any of specified conditions.
4041
- `error_mode (optional)`: determines how errors returned from OTTL statements are handled. Valid values are `propagate`, `ignore` and `silent`. If `ignore` or `silent` is used and a statement's condition has an error then the payload will be routed to the default pipelines. When `silent` is used the error is not logged. If not supplied, `propagate` is used.
41-
- `match_once (optional, default: false)`: determines whether the connector matches multiple statements or not. If enabled, the payload will be routed to the first pipeline in the `table` whose routing condition is met.
42+
- `match_once (optional, default: false)`: determines whether the connector matches multiple statements or not. If enabled, the payload will be routed to the first pipeline in the `table` whose routing condition is met. May only be `false` when used with `resource` context.
4243

43-
Example:
44+
### Examples
45+
46+
Route traces based on an attribute:
4447

4548
```yaml
4649
receivers:
@@ -91,6 +94,92 @@ service:
9194
exporters: [jaeger/ecorp]
9295
```
9396
97+
Route logs based on region:
98+
99+
```yaml
100+
receivers:
101+
otlp:
102+
103+
exporters:
104+
file/other:
105+
path: ./other.log
106+
file/east:
107+
path: ./east.log
108+
file/west:
109+
path: ./west.log
110+
111+
connectors:
112+
routing:
113+
match_once: true
114+
default_pipelines: [logs/other]
115+
table:
116+
- context: log
117+
condition: attributes["region"] == "east"
118+
pipelines: [logs/east]
119+
- context: log
120+
condition: attributes["region"] == "west"
121+
pipelines: [logs/west]
122+
123+
service:
124+
pipelines:
125+
logs/in:
126+
receivers: [otlp]
127+
exporters: [routing]
128+
logs/east:
129+
receivers: [routing]
130+
exporters: [file/east]
131+
logs/west:
132+
receivers: [routing]
133+
exporters: [file/west]
134+
logs/other:
135+
receivers: [routing]
136+
exporters: [file/other]
137+
```
138+
139+
Route all low level logs to cheap storage. Route the remainder based on service name:
140+
141+
```yaml
142+
receivers:
143+
otlp:
144+
145+
exporters:
146+
file/cheap:
147+
path: ./cheap.log
148+
file/service1:
149+
path: ./service1-important.log
150+
file/ecorp:
151+
path: ./service2-important.log
152+
153+
connectors:
154+
routing:
155+
match_once: true
156+
table:
157+
- context: log
158+
condition: severity_number < SEVERITY_NUMBER_ERROR
159+
pipelines: [logs/cheap]
160+
- context: resource
161+
condition: attributes["service.name"] == "service1"
162+
pipelines: [logs/service1]
163+
- context: resource
164+
condition: attributes["service.name"] == "service2"
165+
pipelines: [logs/service2]
166+
167+
service:
168+
pipelines:
169+
logs/in:
170+
receivers: [otlp]
171+
exporters: [routing]
172+
logs/cheap:
173+
receivers: [routing]
174+
exporters: [file/cheap]
175+
logs/service1:
176+
receivers: [routing]
177+
exporters: [file/service1]
178+
logs/service2:
179+
receivers: [routing]
180+
exporters: [file/service2]
181+
```
182+
94183
A signal may get matched by routing conditions of more than one routing table entry. In this case, the signal will be routed to all pipelines of matching routes.
95184
Respectively, if none of the routing conditions met, then a signal is routed to default pipelines.
96185
@@ -109,10 +198,11 @@ Respectively, if none of the routing conditions met, then a signal is routed to
109198
110199
The full list of settings exposed for this connector are documented [here](./config.go) with detailed sample configuration files:
111200
112-
- [logs](./testdata/config_logs.yaml)
113-
- [metrics](./testdata/config_metrics.yaml)
114-
- [traces](./testdata/config_traces.yaml)
201+
- [logs](./testdata/config/logs.yaml)
202+
- [metrics](./testdata/config/metrics.yaml)
203+
- [traces](./testdata/config/traces.yaml)
115204
116205
[Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md
117206
118207
[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md
208+
[OTTL Context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md#contexts

connector/routingconnector/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,27 @@ func (c *Config) Validate() error {
6767
if len(item.Pipelines) == 0 {
6868
return errNoPipelines
6969
}
70+
71+
switch item.Context {
72+
case "", "resource": // ok
73+
case "log":
74+
if !c.MatchOnce {
75+
return errors.New("log context is not supported with match_once: false")
76+
}
77+
default:
78+
return errors.New("invalid context: " + item.Context)
79+
}
7080
}
7181

7282
return nil
7383
}
7484

7585
// RoutingTableItem specifies how data should be routed to the different pipelines
7686
type RoutingTableItem struct {
87+
// One of "resource" or "log" (other OTTL contexts will be added in the future)
88+
// Optional. Default "resource".
89+
Context string `mapstructure:"context"`
90+
7791
// Statement is a OTTL statement used for making a routing decision.
7892
// One of 'Statement' or 'Condition' must be provided.
7993
Statement string `mapstructure:"statement"`

connector/routingconnector/config_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,37 @@ func TestValidateConfig(t *testing.T) {
203203
},
204204
error: "invalid route: both condition and statement provided",
205205
},
206+
{
207+
name: "invalid context",
208+
config: &Config{
209+
Table: []RoutingTableItem{
210+
{
211+
Context: "invalid",
212+
Statement: `route() where attributes["attr"] == "acme"`,
213+
Pipelines: []pipeline.ID{
214+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
215+
},
216+
},
217+
},
218+
},
219+
error: "invalid context: invalid",
220+
},
221+
{
222+
name: "log context with match_once false",
223+
config: &Config{
224+
MatchOnce: false,
225+
Table: []RoutingTableItem{
226+
{
227+
Context: "log",
228+
Statement: `route() where attributes["attr"] == "acme"`,
229+
Pipelines: []pipeline.ID{
230+
pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"),
231+
},
232+
},
233+
},
234+
},
235+
error: "log context is not supported with match_once: false",
236+
},
206237
}
207238

208239
for _, tt := range tests {

connector/routingconnector/logs.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/plogutil"
1717
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
1819
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
1920
)
2021

@@ -74,29 +75,36 @@ func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error {
7475
var errs error
7576
for _, route := range c.router.routeSlice {
7677
matchedLogs := plog.NewLogs()
77-
78-
plogutil.MoveResourcesIf(ld, matchedLogs,
79-
func(rl plog.ResourceLogs) bool {
80-
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
81-
_, isMatch, err := route.statement.Execute(ctx, rtx)
82-
errs = errors.Join(errs, err)
83-
return isMatch
84-
},
85-
)
86-
78+
switch route.statementContext {
79+
case "", "resource":
80+
plogutil.MoveResourcesIf(ld, matchedLogs,
81+
func(rl plog.ResourceLogs) bool {
82+
rtx := ottlresource.NewTransformContext(rl.Resource(), rl)
83+
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
84+
errs = errors.Join(errs, err)
85+
return isMatch
86+
},
87+
)
88+
case "log":
89+
plogutil.MoveRecordsWithContextIf(ld, matchedLogs,
90+
func(rl plog.ResourceLogs, sl plog.ScopeLogs, lr plog.LogRecord) bool {
91+
ltx := ottllog.NewTransformContext(lr, sl.Scope(), rl.Resource(), sl, rl)
92+
_, isMatch, err := route.logStatement.Execute(ctx, ltx)
93+
errs = errors.Join(errs, err)
94+
return isMatch
95+
},
96+
)
97+
}
8798
if errs != nil {
8899
if c.config.ErrorMode == ottl.PropagateError {
89100
return errs
90101
}
91102
groupAll(groups, c.router.defaultConsumer, matchedLogs)
92-
93103
}
94104
groupAll(groups, route.consumer, matchedLogs)
95105
}
96-
97106
// anything left wasn't matched by any route. Send to default consumer
98107
groupAll(groups, c.router.defaultConsumer, ld)
99-
100108
for consumer, group := range groups {
101109
errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group))
102110
}
@@ -110,14 +118,12 @@ func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error {
110118
// higher CPU usage.
111119
groups := make(map[consumer.Logs]plog.Logs)
112120
var errs error
113-
114121
for i := 0; i < ld.ResourceLogs().Len(); i++ {
115122
rlogs := ld.ResourceLogs().At(i)
116123
rtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs)
117-
118124
noRoutesMatch := true
119125
for _, route := range c.router.routeSlice {
120-
_, isMatch, err := route.statement.Execute(ctx, rtx)
126+
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
121127
if err != nil {
122128
if c.config.ErrorMode == ottl.PropagateError {
123129
return err
@@ -129,9 +135,7 @@ func (c *logsConnector) matchAll(ctx context.Context, ld plog.Logs) error {
129135
noRoutesMatch = false
130136
group(groups, route.consumer, rlogs)
131137
}
132-
133138
}
134-
135139
if noRoutesMatch {
136140
// no route conditions are matched, add resource logs to default exporters group
137141
group(groups, c.router.defaultConsumer, rlogs)

connector/routingconnector/logs_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,16 @@ func TestLogsConnectorDetailed(t *testing.T) {
481481
filepath.Join("testdata", "logs", "resource_context", "each_matches_one"),
482482
filepath.Join("testdata", "logs", "resource_context", "match_none_with_default"),
483483
filepath.Join("testdata", "logs", "resource_context", "match_none_without_default"),
484+
filepath.Join("testdata", "logs", "log_context", "all_match_first_only"),
485+
filepath.Join("testdata", "logs", "log_context", "all_match_last_only"),
486+
filepath.Join("testdata", "logs", "log_context", "match_none_with_default"),
487+
filepath.Join("testdata", "logs", "log_context", "match_none_without_default"),
488+
filepath.Join("testdata", "logs", "log_context", "some_match_each_route"),
489+
filepath.Join("testdata", "logs", "log_context", "with_resource_condition"),
490+
filepath.Join("testdata", "logs", "log_context", "with_scope_condition"),
491+
filepath.Join("testdata", "logs", "log_context", "with_resource_and_scope_conditions"),
492+
filepath.Join("testdata", "logs", "mixed_context", "match_resource_then_logs"),
493+
filepath.Join("testdata", "logs", "mixed_context", "match_logs_then_resource"),
484494
}
485495

486496
for _, tt := range testCases {

connector/routingconnector/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metric
7373

7474
noRoutesMatch := true
7575
for _, route := range c.router.routeSlice {
76-
_, isMatch, err := route.statement.Execute(ctx, rtx)
76+
_, isMatch, err := route.resourceStatement.Execute(ctx, rtx)
7777
if err != nil {
7878
if c.config.ErrorMode == ottl.PropagateError {
7979
return err

0 commit comments

Comments
 (0)