Skip to content

Commit a055a0a

Browse files
authored
feat: write column order (#198)
1 parent 2bfba51 commit a055a0a

File tree

12 files changed

+362
-40
lines changed

12 files changed

+362
-40
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## 0.19.0 [unreleased]
44

5+
### Features
6+
7+
1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option.
8+
See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more.
9+
510
## 0.18.0 [2026-02-19]
611

712
### Features

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,30 @@ You can write data using the Point class, or supplying line protocol.
7777
point = Point("measurement").tag("location", "london").field("temperature", 42)
7878
client.write(point)
7979
```
80+
81+
### Control tag order for first-write column order (InfluxDB 3 Enterprise)
82+
```python
83+
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options
84+
85+
point = Point("cpu") \
86+
.tag("host", "server-a") \
87+
.tag("region", "us-east") \
88+
.tag("rack", "r1") \
89+
.field("usage", 0.42)
90+
91+
write_options = WriteOptions(
92+
write_type=WriteType.synchronous,
93+
tag_order=["region", "host"],
94+
)
95+
96+
client = InfluxDBClient3(
97+
token="your-token",
98+
host="your-host",
99+
database="your-database",
100+
write_client_options=write_client_options(write_options=write_options),
101+
)
102+
client.write(point)
103+
```
80104
### Using Line Protocol
81105
```python
82106
point = "measurement fieldname=0"

influxdb_client_3/write_client/client/_base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ def _serialize(self, record, write_precision, payload, **kwargs):
246246
elif isinstance(record, Point):
247247
precision_from_point = kwargs.get('precision_from_point', True)
248248
precision = record.write_precision if precision_from_point else write_precision
249-
self._serialize(record.to_line_protocol(precision=precision), precision, payload, **kwargs)
249+
self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')),
250+
precision, payload, **kwargs)
250251

251252
elif isinstance(record, dict):
252253
self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),

influxdb_client_3/write_client/client/write/dataframe_serializer.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from influxdb_client_3.write_client.domain import WritePrecision
1212
from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \
13-
DEFAULT_WRITE_PRECISION
13+
DEFAULT_WRITE_PRECISION, ordered_tag_keys
1414

1515
logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer')
1616

@@ -130,8 +130,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
130130

131131
# keys holds a list of string keys.
132132
keys = []
133-
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
134-
tags = []
133+
# tag_segments holds map of tag key -> tag f-string segment.
134+
tag_segments = {}
135135
# fields holds a list of field f-string segments ordered alphabetically by field key
136136
fields = []
137137
# field_indexes holds the index into each row of all the fields.
@@ -188,7 +188,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
188188
}}"""
189189
else:
190190
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
191-
tags.append(key_value)
191+
tag_segments[key] = key_value
192192
continue
193193
elif timestamp_column is not None and key in timestamp_column:
194194
timestamp_index = field_index
@@ -225,7 +225,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
225225

226226
measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT)
227227

228-
tags = ''.join(tags)
228+
tag_keys = ordered_tag_keys(list(tag_segments.keys()), kwargs.get('tag_order'))
229+
tag_string = ''.join(tag_segments[tag_key] for tag_key in tag_keys)
229230
fields = ''.join(fields)
230231
timestamp = '{p[%s].value}' % timestamp_index
231232
if precision == WritePrecision.US:
@@ -235,7 +236,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
235236
elif precision == WritePrecision.S:
236237
timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index
237238

238-
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
239+
f = eval(f'lambda p: f"""{{measurement_name}}{tag_string} {fields} {timestamp}"""', {
239240
'measurement_name': measurement_name,
240241
'_ESCAPE_KEY': _ESCAPE_KEY,
241242
'_ESCAPE_STRING': _ESCAPE_STRING,

influxdb_client_3/write_client/client/write/point.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import math
44
import warnings
55
from builtins import int
6+
from collections.abc import Iterable
67
from datetime import datetime, timedelta, timezone
78
from decimal import Decimal
89
from numbers import Integral
@@ -215,11 +216,12 @@ def field(self, field, value):
215216
self._fields[field] = value
216217
return self
217218

218-
def to_line_protocol(self, precision=None):
219+
def to_line_protocol(self, precision=None, tag_order=None):
219220
"""
220221
Create LineProtocol.
221222
222223
:param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``.
224+
:param tag_order: optional list of tag names to prioritize in serialized output
223225
"""
224226
_measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT)
225227
if _measurement.startswith("#"):
@@ -229,7 +231,7 @@ def to_line_protocol(self, precision=None):
229231
- https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments
230232
"""
231233
warnings.warn(message, SyntaxWarning)
232-
_tags = _append_tags(self._tags)
234+
_tags = _append_tags(self._tags, tag_order)
233235
_fields = _append_fields(self._fields, self._field_types)
234236
if not _fields:
235237
return ""
@@ -252,9 +254,10 @@ def __str__(self):
252254
return self.to_line_protocol()
253255

254256

255-
def _append_tags(tags):
257+
def _append_tags(tags, tag_order=None):
256258
_return = []
257-
for tag_key, tag_value in sorted(tags.items()):
259+
for tag_key in ordered_tag_keys(sorted(tags.keys()), tag_order):
260+
tag_value = tags.get(tag_key)
258261

259262
if tag_value is None:
260263
continue
@@ -267,6 +270,49 @@ def _append_tags(tags):
267270
return f"{',' if _return else ''}{','.join(_return)} "
268271

269272

273+
def sanitize_tag_order(tag_order):
274+
if tag_order is None:
275+
return []
276+
277+
if isinstance(tag_order, (str, bytes)):
278+
raise TypeError("tag_order must be an iterable of strings, not str/bytes")
279+
280+
if not isinstance(tag_order, Iterable):
281+
raise TypeError("tag_order must be an iterable of strings")
282+
283+
sanitized = []
284+
seen = set()
285+
for tag in tag_order:
286+
if tag is None or tag == "":
287+
continue
288+
if not isinstance(tag, str):
289+
raise TypeError("tag_order entries must be strings")
290+
if tag in seen:
291+
continue
292+
seen.add(tag)
293+
sanitized.append(tag)
294+
return sanitized
295+
296+
297+
def ordered_tag_keys(existing_keys, tag_order=None):
298+
ordered_keys = list(existing_keys)
299+
if not tag_order:
300+
return ordered_keys
301+
302+
remaining = set(ordered_keys)
303+
prioritized = []
304+
for tag_key in tag_order:
305+
if not tag_key:
306+
continue
307+
if tag_key not in remaining:
308+
continue
309+
remaining.remove(tag_key)
310+
prioritized.append(tag_key)
311+
312+
prioritized.extend([tag_key for tag_key in ordered_keys if tag_key in remaining])
313+
return prioritized
314+
315+
270316
def _append_fields(fields, field_types):
271317
_return = []
272318

influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import logging
88
import math
99

10-
from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION
10+
from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION, \
11+
ordered_tag_keys
1112

1213
logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer')
1314

@@ -36,6 +37,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
3637
self.chunk_size = chunk_size
3738
self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement")
3839
self.tag_columns = kwargs.get("data_frame_tag_columns", [])
40+
self.tag_order = kwargs.get("tag_order", None)
3941
self.timestamp_column = kwargs.get("data_frame_timestamp_column", None)
4042
self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None)
4143

@@ -62,25 +64,31 @@ def escape_value(self, value):
6264
return str(value).translate(_ESCAPE_STRING)
6365

6466
def to_line_protocol(self, row):
65-
# Filter out None or empty values for tags
66-
tags = ""
67+
tag_values = {}
68+
tag_keys = []
69+
for col in self.tag_columns:
70+
value = row[self.column_indices[col]]
71+
if value is None or value == "":
72+
continue
73+
if col not in tag_values:
74+
tag_keys.append(col)
75+
tag_values[col] = value
6776

77+
if self.point_settings.defaultTags:
78+
for key, value in self.point_settings.defaultTags.items():
79+
if value is None or value == "":
80+
continue
81+
if key in tag_values:
82+
continue
83+
tag_keys.append(key)
84+
tag_values[key] = value
85+
86+
final_tag_keys = ordered_tag_keys(tag_keys, self.tag_order)
6887
tags = ",".join(
69-
f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}'
70-
for col in self.tag_columns
71-
if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
88+
f'{self.escape_key(key)}={self.escape_key(tag_values[key])}'
89+
for key in final_tag_keys
7290
)
7391

74-
if self.point_settings.defaultTags:
75-
default_tags = ",".join(
76-
f'{self.escape_key(key)}={self.escape_key(value)}'
77-
for key, value in self.point_settings.defaultTags.items()
78-
)
79-
# Ensure there's a comma between existing tags and default tags if both are present
80-
if tags and default_tags:
81-
tags += ","
82-
tags += default_tags
83-
8492
# add escape symbols for special characters to tags
8593

8694
fields = ",".join(

influxdb_client_3/write_client/client/write_api.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
2323
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
2424
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
25-
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
25+
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, sanitize_tag_order
2626
from influxdb_client_3.write_client.client.write.retry import WritesRetry
2727
from influxdb_client_3.write_client.domain import WritePrecision
2828
from influxdb_client_3.write_client.rest import _UTF_8_encoding
@@ -43,6 +43,8 @@
4343
'record_time_key',
4444
'record_tag_keys',
4545
'record_field_keys',
46+
# Point serialization-specific kwargs
47+
'tag_order',
4648
}
4749

4850
logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
@@ -81,6 +83,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
8183
max_close_wait=300_000,
8284
write_precision=DEFAULT_WRITE_PRECISION,
8385
no_sync=DEFAULT_WRITE_NO_SYNC,
86+
tag_order=None,
8487
timeout=DEFAULT_WRITE_TIMEOUT,
8588
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
8689
"""
@@ -100,6 +103,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
100103
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
101104
:param write_precision: precision to use when writing points to InfluxDB
102105
:param no_sync: skip waiting for WAL persistence on write
106+
:param tag_order: optional list of tag names used to prioritize tag serialization order
103107
:param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000
104108
:param write_scheduler:
105109
"""
@@ -117,6 +121,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
117121
self.write_precision = write_precision
118122
self.timeout = timeout
119123
self.no_sync = no_sync
124+
self.tag_order = sanitize_tag_order(tag_order)
120125

121126
def to_retry_strategy(self, **kwargs):
122127
"""
@@ -380,6 +385,11 @@ def write(self, bucket: str, org: str = None,
380385
if write_precision is None:
381386
write_precision = self._write_options.write_precision
382387

388+
if 'tag_order' in kwargs:
389+
kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order'))
390+
else:
391+
kwargs['tag_order'] = self._write_options.tag_order
392+
383393
if self._write_options.write_type is WriteType.batching:
384394
return self._write_batching(bucket, org, record,
385395
write_precision, **kwargs)
@@ -520,7 +530,9 @@ def _write_batching(self, bucket, org, data,
520530
precision, **kwargs)
521531

522532
elif isinstance(data, Point):
523-
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
533+
self._write_batching(bucket, org,
534+
data.to_line_protocol(tag_order=kwargs.get('tag_order')),
535+
data.write_precision, **kwargs)
524536

525537
elif isinstance(data, dict):
526538
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),

influxdb_client_3/write_client/domain/write_precision.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class WritePrecision(object):
3030
def __init__(self): # noqa: E501,D401,D403
3131
"""WritePrecision - a model defined in OpenAPI.""" # noqa: E501 self.discriminator = None
3232

33-
def to_dict(self):
33+
def to_dict(self): # pragma: no cover
3434
"""Return the model properties as a dict."""
3535
result = {}
3636

@@ -54,21 +54,21 @@ def to_dict(self):
5454

5555
return result
5656

57-
def to_str(self):
57+
def to_str(self): # pragma: no cover
5858
"""Return the string representation of the model."""
5959
return pprint.pformat(self.to_dict())
6060

61-
def __repr__(self):
61+
def __repr__(self): # pragma: no cover
6262
"""For `print` and `pprint`."""
6363
return self.to_str()
6464

65-
def __eq__(self, other):
65+
def __eq__(self, other): # pragma: no cover
6666
"""Return true if both objects are equal."""
6767
if not isinstance(other, WritePrecision):
6868
return False
6969

7070
return self.__dict__ == other.__dict__
7171

72-
def __ne__(self, other):
72+
def __ne__(self, other): # pragma: no cover
7373
"""Return true if both objects are not equal."""
7474
return not self == other

tests/test_dataframe_serializer.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,30 @@ def test_tags_order(self):
276276
self.assertEqual(1, len(points))
277277
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])
278278

279+
points = data_frame_to_list_of_points(data_frame=data_frame,
280+
point_settings=PointSettings(),
281+
data_frame_measurement_name='h2o',
282+
data_frame_tag_columns={"c", "a", "b"},
283+
tag_order=["c", "a"])
284+
285+
self.assertEqual(1, len(points))
286+
self.assertEqual("h2o,c=c,a=a,b=b level=2i 1586048400000000000", points[0])
287+
288+
ps = PointSettings(z="from-default", c="override-ignored")
289+
points_with_defaults = data_frame_to_list_of_points(
290+
data_frame=data_frame,
291+
point_settings=ps,
292+
data_frame_measurement_name='h2o',
293+
data_frame_tag_columns={"c", "a", "b"},
294+
tag_order=["z", "c", "a"],
295+
)
296+
297+
self.assertEqual(1, len(points_with_defaults))
298+
self.assertEqual(
299+
"h2o,z=from-default,c=c,a=a,b=b level=2i 1586048400000000000",
300+
points_with_defaults[0]
301+
)
302+
279303
def test_escape_text_value(self):
280304
now = pd.Timestamp('2020-04-05 00:00+00:00')
281305
an_hour_ago = now - timedelta(hours=1)

0 commit comments

Comments
 (0)