-
Notifications
You must be signed in to change notification settings - Fork 282
Open
Description
@override
Future<Uint8List> consume(
Context ctx, String urn, Execution exec, Uint8List inbound) async {
try {
final uri = ctx.getAttribute(addressKey);
final chan = chans.putIfAbsent(uri, () => create(uri));
final method = ClientMethod<Uint8List, Uint8List>('/mpc/grpc', (v) {
return v;
}, (v) {
return Uint8List.fromList(v);
});
final option = CallOptions(timeout: exec.schema().timeout, metadata: {});
final call = chan.createCall<Uint8List, Uint8List>(
method, Stream.value(inbound), option);
return await call.response.first;
} catch (e) {
if (e is GrpcError) {
switch (e.code) {
case StatusCode.aborted:
case StatusCode.cancelled:
case StatusCode.unknown:
case StatusCode.internal:
case StatusCode.alreadyExists:
throw Status.system.err(r: e.toString());
case StatusCode.invalidArgument:
case StatusCode.dataLoss:
throw Status.validate.err(r: e.toString());
case StatusCode.deadlineExceeded:
throw Status.timeout.err(r: e.toString());
case StatusCode.unavailable:
throw Status.netUnavailable.err(r: e.toString());
case StatusCode.notFound:
case StatusCode.unimplemented:
throw Status.notfound.err(r: e.toString());
case StatusCode.outOfRange:
case StatusCode.resourceExhausted:
case StatusCode.unauthenticated:
case StatusCode.permissionDenied:
case StatusCode.failedPrecondition:
throw Status.unauthorized.err(r: e.toString());
}
}
rethrow;
}
} final call = chan.createCall<Uint8List, Uint8List>(
method, Stream.value(inbound), option);
return await call.response.first;Code above will disconnect connection when some requests response. It can not reuse too many times. How to reuse connections gracefully?
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels