Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/opensearch_pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: exporter/opensearch

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: specify an ingest pipeline to be used before writing documents

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [47227]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The `pipeline` option allows to specify an existing ingest pipeline that should process incoming documents. |
If the ingest pipeline does not exist, ingestion will fail.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 5 additions & 4 deletions exporter/opensearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The OpenSearch exporter supports dynamic index names for both logs and traces us
- `logs_index_fallback` - Fallback value when placeholder is missing (default: `unknown`)
- `logs_index_time_format` - Time suffix format for logs
- `traces_index` - Custom index name pattern for traces
- `traces_index_fallback` - Fallback value when placeholder is missing (default: `unknown`)
- `traces_index_fallback` - Fallback value when placeholder is missing (default: `unknown`)
- `traces_index_time_format` - Time suffix format for traces

**Placeholder Syntax:**
Expand All @@ -58,7 +58,7 @@ Both logs and traces support time-formatted suffixes using `*_time_format` optio

- **Valid tokens** (case-sensitive):
- `yyyy` (4-digit year), `yy` (2-digit year)
- `MM` (2-digit month), `dd` (2-digit day)
- `MM` (2-digit month), `dd` (2-digit day)
- `HH` (2-digit hour, 24h), `mm` (2-digit minute), `ss` (2-digit second)
- **Allowed separators**: `-`, `.`, `_`, `+`
- **Examples:** `yyyy.MM.dd` → `2024.06.07`, `yyyy-MM` → `2024-06`, `yyMMdd` → `240607`
Expand All @@ -80,7 +80,7 @@ exporters:
logs_index: "otel-logs-%{service.name}-%{env}"
logs_index_fallback: "default"
logs_index_time_format: "yyyy.MM.dd"
# Traces configuration
# Traces configuration
traces_index: "otel-traces-%{service.name}-%{deployment.environment}"
traces_index_fallback: "unknown"
traces_index_time_format: "yyyy.MM.dd"
Expand All @@ -99,7 +99,7 @@ If any placeholder key is missing, the fallback value is used e.g.:
### OpenSearch document mapping


The mapping mode can be controlled via the scope attribute `opensearch.mapping.mode`.
The mapping mode can be controlled via the scope attribute `opensearch.mapping.mode`.

The OpenSearch exporter supports several document schemas and preprocessing behaviors, which may be configured through the following settings:

Expand Down Expand Up @@ -185,6 +185,7 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/
### Bulk Indexer Options

- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here.
- `pipeline` (optional): the ID of an [ingest pipeline](https://opensearch.org/docs/latest/ingest-pipelines/) to apply when indexing documents. When set, all documents sent via the bulk API will be processed by the specified pipeline before being indexed. The ingest pipeline must exist in the cluster and there must be a least one node with the `ingest` node role assigned.

## Example

Expand Down
4 changes: 4 additions & 0 deletions exporter/opensearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type Config struct {
// BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here.
// If not specified, the default value `create` will be used.
BulkAction string `mapstructure:"bulk_action"`

// Pipeline is the optional ID of an ingest pipeline to apply when indexing documents.
// https://opensearch.org/docs/latest/ingest-pipelines/
Pipeline string `mapstructure:"pipeline"`
}

var (
Expand Down
3 changes: 3 additions & 0 deletions exporter/opensearchexporter/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ properties:
type: string
namespace:
type: string
pipeline:
description: Pipeline is the optional ID of an ingest pipeline to apply when indexing documents. https://opensearch.org/docs/latest/ingest-pipelines/
type: string
sending_queue:
x-optional: true
$ref: go.opentelemetry.io/collector/exporter/exporterhelper.queue_batch_config
Expand Down
8 changes: 8 additions & 0 deletions exporter/opensearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ func TestLoadConfig(t *testing.T) {
return assert.ErrorContains(t, err, errTracesIndexTimeFormatInvalid.Error())
},
},
{
id: component.NewIDWithName(metadata.Type, "pipeline"),
expected: withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.Pipeline = "my-pipeline"
}),
configValidateAssert: assert.NoError,
},
}

for _, tt := range tests {
Expand Down
10 changes: 6 additions & 4 deletions exporter/opensearchexporter/log_bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ import (

type logBulkIndexer struct {
bulkAction string
pipeline string
model mappingModel
errs []error
bulkIndexer opensearchutil.BulkIndexer
}

func newLogBulkIndexer(bulkAction string, model mappingModel) *logBulkIndexer {
return &logBulkIndexer{bulkAction, model, nil, nil}
func newLogBulkIndexer(bulkAction string, model mappingModel, pipeline string) *logBulkIndexer {
return &logBulkIndexer{bulkAction: bulkAction, pipeline: pipeline, model: model, errs: nil, bulkIndexer: nil}
}

func (lbi *logBulkIndexer) start(client *opensearchapi.Client) error {
var startErr error
lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.onIndexerError)
lbi.bulkIndexer, startErr = newLogOpenSearchBulkIndexer(client, lbi.onIndexerError, lbi.pipeline)
return startErr
}

Expand Down Expand Up @@ -138,10 +139,11 @@ func (lbi *logBulkIndexer) newBulkIndexerItem(document []byte, indexName string)
return item
}

func newLogOpenSearchBulkIndexer(client *opensearchapi.Client, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) {
func newLogOpenSearchBulkIndexer(client *opensearchapi.Client, onIndexerError func(context.Context, error), pipeline string) (opensearchutil.BulkIndexer, error) {
return opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
NumWorkers: 1,
Client: client,
OnError: onIndexerError,
Pipeline: pipeline,
})
}
22 changes: 22 additions & 0 deletions exporter/opensearchexporter/log_bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ func TestProcessItemFailure(t *testing.T) {
}
}

func TestNewLogBulkIndexerWithPipeline(t *testing.T) {
tests := []struct {
name string
pipeline string
}{
{"empty pipeline", ""},
{"with pipeline", "my-pipeline"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lbi := newLogBulkIndexer("create", nil, tt.pipeline)
if lbi.pipeline != tt.pipeline {
t.Errorf("expected pipeline %q, got %q", tt.pipeline, lbi.pipeline)
}
if lbi.bulkAction != "create" {
t.Errorf("expected bulkAction 'create', got %s", lbi.bulkAction)
}
})
}
}

func TestNewBulkIndexerItem(t *testing.T) {
lbi := &logBulkIndexer{bulkAction: "index"}
payload := []byte(`{"test": "data"}`)
Expand Down
2 changes: 1 addition & 1 deletion exporter/opensearchexporter/sso_log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (l *logExporter) Start(ctx context.Context, host component.Host) error {
}

func (l *logExporter) pushLogData(ctx context.Context, ld plog.Logs) error {
indexer := newLogBulkIndexer(l.bulkAction, l.model)
indexer := newLogBulkIndexer(l.bulkAction, l.model, l.config.Pipeline)
startErr := indexer.start(l.client)
if startErr != nil {
return startErr
Expand Down
2 changes: 1 addition & 1 deletion exporter/opensearchexporter/sso_trace_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (s *ssoTracesExporter) Start(ctx context.Context, host component.Host) erro
}

func (s *ssoTracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
indexer := newTraceBulkIndexer(s.bulkAction, s.model)
indexer := newTraceBulkIndexer(s.bulkAction, s.model, s.config.Pipeline)
startErr := indexer.start(s.client)
if startErr != nil {
return startErr
Expand Down
5 changes: 5 additions & 0 deletions exporter/opensearchexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ opensearch/traces_index_time_format_invalid:
traces_index_fallback: "default-service"
traces_index_time_format: "invalid_format!"

opensearch/pipeline:
http:
endpoint: https://opensearch.example.com:9200
pipeline: "my-pipeline"

opensearch/sending_queue_with_batch:
http:
endpoint: https://opensearch.example.com:9200
Expand Down
10 changes: 6 additions & 4 deletions exporter/opensearchexporter/trace_bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

type traceBulkIndexer struct {
bulkAction string
pipeline string
model mappingModel
errs []error
bulkIndexer opensearchutil.BulkIndexer
}

func newTraceBulkIndexer(bulkAction string, model mappingModel) *traceBulkIndexer {
return &traceBulkIndexer{bulkAction: bulkAction, model: model, errs: nil, bulkIndexer: nil}
func newTraceBulkIndexer(bulkAction string, model mappingModel, pipeline string) *traceBulkIndexer {
return &traceBulkIndexer{bulkAction: bulkAction, pipeline: pipeline, model: model, errs: nil, bulkIndexer: nil}
}

func (tbi *traceBulkIndexer) joinedError() error {
Expand All @@ -35,7 +36,7 @@ func (tbi *traceBulkIndexer) joinedError() error {

func (tbi *traceBulkIndexer) start(client *opensearchapi.Client) error {
var startErr error
tbi.bulkIndexer, startErr = newOpenSearchBulkIndexer(client, tbi.onIndexerError)
tbi.bulkIndexer, startErr = newOpenSearchBulkIndexer(client, tbi.onIndexerError, tbi.pipeline)
return startErr
}

Expand Down Expand Up @@ -159,10 +160,11 @@ func (tbi *traceBulkIndexer) newBulkIndexerItem(document []byte, indexName strin
return item
}

func newOpenSearchBulkIndexer(client *opensearchapi.Client, onIndexerError func(context.Context, error)) (opensearchutil.BulkIndexer, error) {
func newOpenSearchBulkIndexer(client *opensearchapi.Client, onIndexerError func(context.Context, error), pipeline string) (opensearchutil.BulkIndexer, error) {
return opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
NumWorkers: 1,
Client: client,
OnError: onIndexerError,
Pipeline: pipeline,
})
}
22 changes: 22 additions & 0 deletions exporter/opensearchexporter/trace_bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ func TestTraceProcessItemFailure(t *testing.T) {
}
}

func TestNewTraceBulkIndexerWithPipeline(t *testing.T) {
tests := []struct {
name string
pipeline string
}{
{"empty pipeline", ""},
{"with pipeline", "my-pipeline"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tbi := newTraceBulkIndexer("create", nil, tt.pipeline)
if tbi.pipeline != tt.pipeline {
t.Errorf("expected pipeline %q, got %q", tt.pipeline, tbi.pipeline)
}
if tbi.bulkAction != "create" {
t.Errorf("expected bulkAction 'create', got %s", tbi.bulkAction)
}
})
}
}

func TestTraceNewBulkIndexerItem(t *testing.T) {
tbi := &traceBulkIndexer{bulkAction: "create"}
payload := []byte(`{"test": "data"}`)
Expand Down