Skip to content

Commit ee29249

Browse files
authored
feat: upgrade to otel collector v0.102.1 (#132)
* feat: upgrade to otel collector v0.102.1 * make modified kafka exporter consistent with original * update go-grpc-compression pkg to 1.2.3 to fix vulnerability * fix even more vulnerabilities
1 parent 4543e39 commit ee29249

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+2237
-1475
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ jobs:
2929
uses: actions/checkout@v4
3030
- if: matrix.os == 'ubuntu-latest'
3131
name: golangci-lint
32-
uses: golangci/golangci-lint-action@v4
32+
uses: golangci/golangci-lint-action@v6
3333
with:
3434
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
35-
version: v1.58.1
35+
version: v1.59.0
3636
skip-pkg-cache: true
3737
only-new-issues: true
3838
- name: Run unit tests

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
run:
2-
deadline: 5m
2+
timeout: 5m
33

44
linters:
55
disable-all: true

exporter/kafkaexporter/README.md

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
1-
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/exporter/kafkaexporter and
1+
**IMPORTANT:** This component is copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.102.0/exporter/kafkaexporter and
22
adapted to accept compression settings and also do span curing on large spans.
33
# Kafka Exporter
44

55
<!-- status autogenerated section -->
66
| Status | |
77
| ------------- |-----------|
88
| Stability | [beta]: traces, metrics, logs |
9-
| Distributions | [core], [contrib], [aws], [observiq], [splunk], [sumo] |
9+
| Distributions | [core], [contrib] |
1010
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fkafka%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fkafka) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fkafka%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fkafka) |
1111
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@pavolloffay](https://www.github.com/pavolloffay), [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
1212

1313
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
1414
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
1515
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
16-
[aws]: https://github.com/aws-observability/aws-otel-collector
17-
[observiq]: https://github.com/observIQ/observiq-otel-collector
18-
[splunk]: https://github.com/signalfx/splunk-otel-collector
19-
[sumo]: https://github.com/SumoLogic/sumologic-otel-collector
2016
<!-- end autogenerated section -->
2117

2218
Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a synchronous producer
@@ -31,6 +27,7 @@ The following settings can be optionally configured:
3127
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
3228
- `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
3329
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
30+
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead.
3431
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
3532
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
3633
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
@@ -42,6 +39,7 @@ The following settings can be optionally configured:
4239
- The following encodings are valid *only* for **logs**.
4340
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
4441
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
42+
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
4543
- `auth`
4644
- `plain_text`
4745
- `username`: The username to use.
@@ -53,7 +51,7 @@ The following settings can be optionally configured:
5351
- `version` (default = 0): The SASL protocol version to use (0 or 1)
5452
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism
5553
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
56-
- `tls`
54+
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
5755
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
5856
only be used if `insecure` is set to false.
5957
- `cert_file`: path to the TLS cert to use for TLS required connections. Should

exporter/kafkaexporter/config.go_original

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type Config struct {
4040
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
4141
Topic string `mapstructure:"topic"`
4242

43+
// TopicFromAttribute is the name of the attribute to use as the topic name.
44+
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
45+
4346
// Encoding of messages (default "otlp_proto")
4447
Encoding string `mapstructure:"encoding"`
4548

@@ -48,6 +51,8 @@ type Config struct {
4851
// trace ID as the message key by default.
4952
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
5053

54+
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
55+
5156
// Metadata is the namespace for metadata management properties used by the
5257
// Client, and shared by the Producer/Consumer.
5358
Metadata Metadata `mapstructure:"metadata"`

exporter/kafkaexporter/config.modified.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type Config struct {
4040
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
4141
Topic string `mapstructure:"topic"`
4242

43+
// TopicFromAttribute is the name of the attribute to use as the topic name.
44+
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
45+
4346
// Encoding of messages (default "otlp_proto")
4447
Encoding string `mapstructure:"encoding"`
4548

@@ -48,6 +51,8 @@ type Config struct {
4851
// trace ID as the message key by default.
4952
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
5053

54+
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
55+
5156
// Metadata is the namespace for metadata management properties used by the
5257
// Client, and shared by the Producer/Consumer.
5358
Metadata Metadata `mapstructure:"metadata"`

exporter/kafkaexporter/config_modified_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
3535
}{
3636
{
3737
id: component.NewIDWithName(metadata.Type, ""),
38-
option: func(conf *Config) {
38+
option: func(_ *Config) {
3939
// intentionally left blank so we use default config
4040
},
4141
expected: &Config{
@@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
5555
NumConsumers: 2,
5656
QueueSize: 10,
5757
},
58-
Topic: "spans",
59-
Encoding: "otlp_proto",
60-
PartitionTracesByID: true,
61-
Brokers: []string{"foo:123", "bar:456"},
62-
ClientID: "test_client_id",
58+
Topic: "spans",
59+
Encoding: "otlp_proto",
60+
PartitionTracesByID: true,
61+
PartitionMetricsByResourceAttributes: true,
62+
Brokers: []string{"foo:123", "bar:456"},
63+
ClientID: "test_client_id",
6364
Authentication: kafka.Authentication{
6465
PlainText: &kafka.PlainTextConfig{
6566
Username: "jdoe",
@@ -119,11 +120,12 @@ func TestLoadConfig(t *testing.T) {
119120
NumConsumers: 2,
120121
QueueSize: 10,
121122
},
122-
Topic: "spans",
123-
Encoding: "otlp_proto",
124-
PartitionTracesByID: true,
125-
Brokers: []string{"foo:123", "bar:456"},
126-
ClientID: "test_client_id",
123+
Topic: "spans",
124+
Encoding: "otlp_proto",
125+
PartitionTracesByID: true,
126+
PartitionMetricsByResourceAttributes: true,
127+
Brokers: []string{"foo:123", "bar:456"},
128+
ClientID: "test_client_id",
127129
Authentication: kafka.Authentication{
128130
PlainText: &kafka.PlainTextConfig{
129131
Username: "jdoe",
@@ -185,6 +187,7 @@ func TestLoadConfig(t *testing.T) {
185187
Topic: "spans",
186188
Encoding: "otlp_proto",
187189
PartitionTracesByID: true,
190+
PartitionMetricsByResourceAttributes: true,
188191
Brokers: []string{"foo:123", "bar:456"},
189192
ClientID: "test_client_id",
190193
ResolveCanonicalBootstrapServersOnly: true,

exporter/kafkaexporter/config_test.go_original

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
3535
}{
3636
{
3737
id: component.NewIDWithName(metadata.Type, ""),
38-
option: func(conf *Config) {
38+
option: func(_ *Config) {
3939
// intentionally left blank so we use default config
4040
},
4141
expected: &Config{
@@ -55,11 +55,12 @@ func TestLoadConfig(t *testing.T) {
5555
NumConsumers: 2,
5656
QueueSize: 10,
5757
},
58-
Topic: "spans",
59-
Encoding: "otlp_proto",
60-
PartitionTracesByID: true,
61-
Brokers: []string{"foo:123", "bar:456"},
62-
ClientID: "test_client_id",
58+
Topic: "spans",
59+
Encoding: "otlp_proto",
60+
PartitionTracesByID: true,
61+
PartitionMetricsByResourceAttributes: true,
62+
Brokers: []string{"foo:123", "bar:456"},
63+
ClientID: "test_client_id",
6364
Authentication: kafka.Authentication{
6465
PlainText: &kafka.PlainTextConfig{
6566
Username: "jdoe",
@@ -109,11 +110,12 @@ func TestLoadConfig(t *testing.T) {
109110
NumConsumers: 2,
110111
QueueSize: 10,
111112
},
112-
Topic: "spans",
113-
Encoding: "otlp_proto",
114-
PartitionTracesByID: true,
115-
Brokers: []string{"foo:123", "bar:456"},
116-
ClientID: "test_client_id",
113+
Topic: "spans",
114+
Encoding: "otlp_proto",
115+
PartitionTracesByID: true,
116+
PartitionMetricsByResourceAttributes: true,
117+
Brokers: []string{"foo:123", "bar:456"},
118+
ClientID: "test_client_id",
117119
Authentication: kafka.Authentication{
118120
PlainText: &kafka.PlainTextConfig{
119121
Username: "jdoe",
@@ -165,6 +167,7 @@ func TestLoadConfig(t *testing.T) {
165167
Topic: "spans",
166168
Encoding: "otlp_proto",
167169
PartitionTracesByID: true,
170+
PartitionMetricsByResourceAttributes: true,
168171
Brokers: []string{"foo:123", "bar:456"},
169172
ClientID: "test_client_id",
170173
ResolveCanonicalBootstrapServersOnly: true,

exporter/kafkaexporter/factory.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ const (
3838
defaultCompression = "none"
3939
// default from sarama.NewConfig()
4040
defaultFluxMaxMessages = 0
41+
// partitioning metrics by resource attributes is disabled by default
42+
defaultPartitionMetricsByResourceAttributesEnabled = false
4143
)
4244

4345
// FactoryOption applies changes to kafkaExporterFactory.
@@ -97,8 +99,9 @@ func createDefaultConfig() component.Config {
9799
Brokers: []string{defaultBroker},
98100
ClientID: defaultClientID,
99101
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
100-
Topic: "",
101-
Encoding: defaultEncoding,
102+
Topic: "",
103+
Encoding: defaultEncoding,
104+
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
102105
Metadata: Metadata{
103106
Full: defaultMetadataFull,
104107
Retry: MetadataRetry{
@@ -148,6 +151,7 @@ func (f *kafkaExporterFactory) createTracesExporter(
148151
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
149152
exporterhelper.WithRetry(oCfg.BackOffConfig),
150153
exporterhelper.WithQueue(oCfg.QueueSettings),
154+
exporterhelper.WithStart(exp.start),
151155
exporterhelper.WithShutdown(exp.Close))
152156
}
153157

@@ -178,6 +182,7 @@ func (f *kafkaExporterFactory) createMetricsExporter(
178182
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
179183
exporterhelper.WithRetry(oCfg.BackOffConfig),
180184
exporterhelper.WithQueue(oCfg.QueueSettings),
185+
exporterhelper.WithStart(exp.start),
181186
exporterhelper.WithShutdown(exp.Close))
182187
}
183188

@@ -208,5 +213,6 @@ func (f *kafkaExporterFactory) createLogsExporter(
208213
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
209214
exporterhelper.WithRetry(oCfg.BackOffConfig),
210215
exporterhelper.WithQueue(oCfg.QueueSettings),
216+
exporterhelper.WithStart(exp.start),
211217
exporterhelper.WithShutdown(exp.Close))
212218
}

exporter/kafkaexporter/factory_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/IBM/sarama"
1313
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
1415
"go.opentelemetry.io/collector/component/componenttest"
1516
"go.opentelemetry.io/collector/exporter/exportertest"
1617
"go.opentelemetry.io/collector/pdata/plog"
@@ -124,9 +125,11 @@ func TestCreateMetricExporter(t *testing.T) {
124125
exportertest.NewNopCreateSettings(),
125126
tc.conf,
126127
)
128+
require.NoError(t, err)
129+
assert.NotNil(t, exporter, "Must return valid exporter")
130+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
127131
if tc.err != nil {
128132
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
129-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
130133
return
131134
}
132135
assert.NoError(t, err, "Must not error")
@@ -199,9 +202,11 @@ func TestCreateLogExporter(t *testing.T) {
199202
exportertest.NewNopCreateSettings(),
200203
tc.conf,
201204
)
205+
require.NoError(t, err)
206+
assert.NotNil(t, exporter, "Must return valid exporter")
207+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
202208
if tc.err != nil {
203209
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
204-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
205210
return
206211
}
207212
assert.NoError(t, err, "Must not error")
@@ -274,9 +279,11 @@ func TestCreateTraceExporter(t *testing.T) {
274279
exportertest.NewNopCreateSettings(),
275280
tc.conf,
276281
)
282+
require.NoError(t, err)
283+
assert.NotNil(t, exporter, "Must return valid exporter")
284+
err = exporter.Start(context.Background(), componenttest.NewNopHost())
277285
if tc.err != nil {
278286
assert.ErrorAs(t, err, &tc.err, "Must match the expected error")
279-
assert.Nil(t, exporter, "Must return nil value for invalid exporter")
280287
return
281288
}
282289
assert.NoError(t, err, "Must not error")

0 commit comments

Comments
 (0)