-
Notifications
You must be signed in to change notification settings - Fork 63
Expand file tree
/
Copy pathmetrics_interceptor.py
More file actions
172 lines (153 loc) · 6.05 KB
/
metrics_interceptor.py
File metadata and controls
172 lines (153 loc) · 6.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Sequence
import time
from functools import wraps
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric
from google.cloud.bigtable.data._metrics.data_model import OperationState
from google.cloud.bigtable.data._metrics.data_model import OperationType
from google.cloud.bigtable.data._cross_sync import CrossSync
if CrossSync.is_async:
from grpc.aio import UnaryUnaryClientInterceptor
from grpc.aio import UnaryStreamClientInterceptor
from grpc.aio import AioRpcError
else:
from grpc import UnaryUnaryClientInterceptor
from grpc import UnaryStreamClientInterceptor
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.metrics_interceptor"
def _with_active_operation(func):
"""
Decorator for interceptor methods to extract the active operation associated with the
in-scope contextvars, and pass it to the decorated function.
"""
@wraps(func)
def wrapper(self, continuation, client_call_details, request):
operation: ActiveOperationMetric | None = ActiveOperationMetric.from_context()
if operation:
# start a new attempt if not started
if (
operation.state == OperationState.CREATED
or operation.state == OperationState.BETWEEN_ATTEMPTS
):
operation.start_attempt()
# wrap continuation in logic to process the operation
return func(self, operation, continuation, client_call_details, request)
else:
# if operation not found, return unwrapped continuation
return continuation(client_call_details, request)
return wrapper
@CrossSync.convert
async def _get_metadata(source) -> dict[str, str | bytes] | None:
"""Helper to extract metadata from a call or RpcError"""
try:
metadata: Sequence[tuple[str, str | bytes]]
if CrossSync.is_async:
# grpc.aio returns metadata in Metadata objects
if isinstance(source, AioRpcError):
metadata = list(source.trailing_metadata()) + list(
source.initial_metadata()
)
else:
metadata = list(await source.trailing_metadata()) + list(
await source.initial_metadata()
)
else:
# sync grpc returns metadata as a sequence of tuples
metadata = source.trailing_metadata() + source.initial_metadata()
# convert metadata to dict format
return {k: v for (k, v) in metadata}
except Exception:
# ignore errors while fetching metadata
return None
@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor")
class AsyncBigtableMetricsInterceptor(
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor
):
"""
An async gRPC interceptor to add client metadata and print server metadata.
"""
@CrossSync.convert
@_with_active_operation
async def intercept_unary_unary(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for unary rpcs:
- MutateRow
- CheckAndMutateRow
- ReadModifyWriteRow
"""
metadata = None
try:
call = await continuation(client_call_details, request)
metadata = await _get_metadata(call)
return call
except Exception as rpc_error:
metadata = await _get_metadata(rpc_error)
raise rpc_error
finally:
if metadata is not None:
operation.add_response_metadata(metadata)
@CrossSync.convert
@_with_active_operation
async def intercept_unary_stream(
self, operation, continuation, client_call_details, request
):
"""
Interceptor for streaming rpcs:
- ReadRows
- MutateRows
- SampleRowKeys
"""
try:
return self._streaming_generator_wrapper(
operation, await continuation(client_call_details, request)
)
except Exception as rpc_error:
# handle errors while intializing stream
metadata = await _get_metadata(rpc_error)
if metadata is not None:
operation.add_response_metadata(metadata)
raise rpc_error
@staticmethod
@CrossSync.convert
async def _streaming_generator_wrapper(operation, call):
"""
Wrapped generator to be returned by intercept_unary_stream.
"""
# only track has_first response for READ_ROWS
has_first_response = (
operation.first_response_latency_ns is not None
or operation.op_type != OperationType.READ_ROWS
)
encountered_exc = None
try:
async for response in call:
# record time to first response. Currently only used for READ_ROWs
if not has_first_response:
operation.first_response_latency_ns = (
time.monotonic_ns() - operation.start_time_ns
)
has_first_response = True
yield response
except Exception as e:
# handle errors while processing stream
encountered_exc = e
raise
finally:
if call is not None:
metadata = await _get_metadata(encountered_exc or call)
if metadata is not None:
operation.add_response_metadata(metadata)