Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/java/io/grpc/internal/ClientTransportFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.MetricRecorder;
import java.io.Closeable;
import java.net.SocketAddress;
import java.util.Collection;
Expand Down Expand Up @@ -91,6 +92,7 @@ final class ClientTransportOptions {
private Attributes eagAttributes = Attributes.EMPTY;
@Nullable private String userAgent;
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
@Nullable private MetricRecorder metricRecorder;

public ChannelLogger getChannelLogger() {
return channelLogger;
Expand All @@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
return this;
}

@Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}

public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}

public String getAuthority() {
return authority;
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/grpc/internal/InternalServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.MetricRecorder;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
Expand Down Expand Up @@ -71,4 +72,9 @@ public interface InternalServer {
*/
@Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();

/**
* Sets the MetricRecorder for the server. This optional method allows setting
* the MetricRecorder after construction but before start().
*/
default void setMetricRecorder(MetricRecorder metricRecorder) {}
}
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private final InternalChannelz channelz;
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
private final MetricRecorder metricRecorder;
private final ChannelLogger channelLogger;
private final boolean reconnectDisabled;

Expand Down Expand Up @@ -191,6 +192,7 @@ protected void handleNotInUse() {
this.scheduledExecutor = scheduledExecutor;
this.connectingTimer = stopwatchSupplier.get();
this.syncContext = syncContext;
this.metricRecorder = metricRecorder;
this.callback = callback;
this.channelz = channelz;
this.callsTracer = callsTracer;
Expand Down Expand Up @@ -265,6 +267,7 @@ private void startNewTransport() {
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
.setEagAttributes(currentEagAttributes)
.setUserAgent(userAgent)
.setMetricRecorder(metricRecorder)
.setHttpConnectProxiedSocketAddress(proxiedAddr);
TransportLogger transportLogger = new TransportLogger();
// In case the transport logs in the constructor, use the subchannel logId
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
import io.grpc.ServerCall;
import io.grpc.ServerCallExecutorSupplier;
import io.grpc.ServerCallHandler;
Expand Down Expand Up @@ -97,6 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume

private final InternalLogId logId;
private final ObjectPool<? extends Executor> executorPool;
private final MetricRecorder metricRecorder;
/** Executor for application processing. Safe to read after {@link #start()}. */
private Executor executor;
private final HandlerRegistry registry;
Expand Down Expand Up @@ -143,6 +146,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
InternalServer transportServer,
Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.metricRecorder =
new MetricRecorderImpl(builder.metricSinks, MetricInstrumentRegistry.getDefaultRegistry());

this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
Expand Down Expand Up @@ -182,6 +188,7 @@ public ServerImpl start() throws IOException {
// Start and wait for any ports to actually be bound.

ServerListenerImpl listener = new ServerListenerImpl();
transportServer.setMetricRecorder(metricRecorder);
transportServer.start(listener);
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerImplBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
import io.grpc.InternalConfiguratorRegistry;
import io.grpc.MetricSink;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
Expand Down Expand Up @@ -80,6 +81,7 @@ public static ServerBuilder<?> forPort(int port) {
final List<ServerTransportFilter> transportFilters = new ArrayList<>();
final List<ServerInterceptor> interceptors = new ArrayList<>();
private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
final List<MetricSink> metricSinks = new ArrayList<>();
private final ClientTransportServersBuilder clientTransportServersBuilder;
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
Expand Down Expand Up @@ -157,6 +159,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
return this;
}

/**
* Adds a MetricSink to the server.
*/
public ServerImplBuilder addMetricSink(MetricSink metricSink) {
metricSinks.add(checkNotNull(metricSink, "metricSink"));
return this;
}

@Override
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
streamTracerFactories.add(checkNotNull(factory, "factory"));
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/io/grpc/internal/ServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ public interface ServerTransport extends InternalInstrumented<SocketStats> {
* outstanding tasks are cancelled when the transport terminates.
*/
ScheduledExecutorService getScheduledExecutorService();

}
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ public void run() {
localSocketPicker,
channelLogger,
useGetForSafeMethods,
options.getMetricRecorder(),
Ticker.systemTicker());
return transport;
}
Expand Down
24 changes: 19 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.InternalChannelz;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.ClientStreamListener.RpcProgress;
Expand Down Expand Up @@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private final Supplier<Stopwatch> stopwatchFactory;
private final TransportTracer transportTracer;
private final Attributes eagAttributes;
private final TcpMetrics.Tracker tcpMetrics;
private final String authority;
private final InUseStateAggregator<Http2Stream> inUseState =
new InUseStateAggregator<Http2Stream>() {
Expand Down Expand Up @@ -164,7 +166,8 @@ static NettyClientHandler newHandler(
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
Ticker ticker) {
Ticker ticker,
MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
Expand Down Expand Up @@ -194,7 +197,8 @@ static NettyClientHandler newHandler(
eagAttributes,
authority,
negotiationLogger,
ticker);
ticker,
metricRecorder);
}

@VisibleForTesting
Expand All @@ -214,7 +218,8 @@ static NettyClientHandler newHandler(
Attributes eagAttributes,
String authority,
ChannelLogger negotiationLogger,
Ticker ticker) {
Ticker ticker,
MetricRecorder metricRecorder) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
Expand Down Expand Up @@ -269,7 +274,8 @@ static NettyClientHandler newHandler(
pingCounter,
ticker,
maxHeaderListSize,
softLimitHeaderListSize);
softLimitHeaderListSize,
metricRecorder);
}

private NettyClientHandler(
Expand All @@ -288,7 +294,8 @@ private NettyClientHandler(
PingLimiter pingLimiter,
Ticker ticker,
int maxHeaderListSize,
int softLimitHeaderListSize) {
int softLimitHeaderListSize,
MetricRecorder metricRecorder) {
super(
/* channelUnused= */ null,
decoder,
Expand Down Expand Up @@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) {
}
}
});
this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client");
}

/**
Expand Down Expand Up @@ -490,6 +498,12 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
/**
* Handler for the Channel shutting down.
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
tcpMetrics.channelActive(ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
Expand Down
7 changes: 6 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ConnectionClientTransport;
Expand Down Expand Up @@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport,
private final ChannelLogger channelLogger;
private final boolean useGetForSafeMethods;
private final Ticker ticker;
private final MetricRecorder metricRecorder;


NettyClientTransport(
Expand All @@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport,
LocalSocketPicker localSocketPicker,
ChannelLogger channelLogger,
boolean useGetForSafeMethods,
MetricRecorder metricRecorder,
Ticker ticker) {

this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
Expand Down Expand Up @@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport,
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.useGetForSafeMethods = useGetForSafeMethods;
this.metricRecorder = metricRecorder;
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
}

Expand Down Expand Up @@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) {
eagAttributes,
authorityString,
channelLogger,
ticker);
ticker,
metricRecorder);

ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Expand Down
5 changes: 4 additions & 1 deletion netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxMessageSize;
private final int maxHeaderListSize;
private final int softLimitHeaderListSize;
private MetricRecorder metricRecorder;
private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos;
private final long maxConnectionIdleInNanos;
Expand Down Expand Up @@ -272,7 +274,8 @@ public void initChannel(Channel ch) {
permitKeepAliveTimeInNanos,
maxRstCount,
maxRstPeriodNanos,
eagAttributes);
eagAttributes,
metricRecorder);
ServerTransportListener transportListener;
// This is to order callbacks on the listener, not to guard access to channel.
synchronized (NettyServer.this) {
Expand Down
21 changes: 16 additions & 5 deletions netty/src/main/java/io/grpc/netty/NettyServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.InternalMetadata;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -127,6 +128,7 @@ class NettyServerHandler extends AbstractNettyHandler {
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
private final int maxMessageSize;
private final TcpMetrics.Tracker tcpMetrics;
private final long keepAliveTimeInNanos;
private final long keepAliveTimeoutInNanos;
private final long maxConnectionAgeInNanos;
Expand Down Expand Up @@ -174,7 +176,8 @@ static NettyServerHandler newHandler(
long permitKeepAliveTimeInNanos,
int maxRstCount,
long maxRstPeriodNanos,
Attributes eagAttributes) {
Attributes eagAttributes,
MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
maxHeaderListSize);
Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
Expand Down Expand Up @@ -208,7 +211,8 @@ static NettyServerHandler newHandler(
maxRstCount,
maxRstPeriodNanos,
eagAttributes,
Ticker.systemTicker());
Ticker.systemTicker(),
metricRecorder);
}

static NettyServerHandler newHandler(
Expand All @@ -234,7 +238,8 @@ static NettyServerHandler newHandler(
int maxRstCount,
long maxRstPeriodNanos,
Attributes eagAttributes,
Ticker ticker) {
Ticker ticker,
MetricRecorder metricRecorder) {
Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
flowControlWindow);
Expand Down Expand Up @@ -294,7 +299,8 @@ static NettyServerHandler newHandler(
keepAliveEnforcer,
autoFlowControl,
rstStreamCounter,
eagAttributes, ticker);
eagAttributes, ticker,
metricRecorder);
}

private NettyServerHandler(
Expand All @@ -318,7 +324,8 @@ private NettyServerHandler(
boolean autoFlowControl,
RstStreamCounter rstStreamCounter,
Attributes eagAttributes,
Ticker ticker) {
Ticker ticker,
MetricRecorder metricRecorder) {
super(
channelUnused,
decoder,
Expand Down Expand Up @@ -362,6 +369,8 @@ public void onStreamClosed(Http2Stream stream) {

checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
this.maxMessageSize = maxMessageSize;
this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "server");

this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleManager = maxConnectionIdleManager;
Expand Down Expand Up @@ -663,6 +672,8 @@ void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
tcpMetrics.channelInactive(ctx.channel());

try {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
Expand Down
Loading
Loading