In this post, I will cover the advanced topics of gRPC.
Interceptors Client-side interceptors A gRPC Client Interceptor allows you to intercept outbound RPC calls on the client side before the call is sent to the server and when receiving response messages.
You can use them to:
Add metadata (authentication tokens, request IDs)
Add logging / tracing
Implement retry policies
Modify call options
Measure latency
Wrap/inspect responses and errors
How to use the gRPC Client Interceptor
1. Create a client interceptor that implements the ClientInterceptor interface
MyClientInterceptor.java
package com.taogen.grpc.interceptor.interceptor.client;import io.grpc.*;public class MyClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall (MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { System.out.println("Client Interceptor: Calling method " + method.getFullMethodName()); return next.newCall(method, callOptions); } }
2. Add interceptors to the gRPC client
ManagedChannel channel = ManagedChannelBuilder .forAddress("localhost" , 8080 ) .usePlaintext() .intercept(new MyClientInterceptor ()) .build();
More advanced client-side interceptor
An example:
AroundClientInterceptor.java
package com.taogen.grpc.interceptor.interceptor.client;import io.grpc.*;public class AroundClientInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall (MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { System.out.println("[Client] Calling method: " + method.getFullMethodName()); ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions); return new ForwardingClientCall .SimpleForwardingClientCall<ReqT, RespT>(clientCall) { @Override public void start (Listener<RespT> responseListener, Metadata headers) { System.out.println("[Client] Sending headers: " + headers); Listener<RespT> listener = new ForwardingClientCallListener .SimpleForwardingClientCallListener<RespT>(responseListener) { @Override public void onHeaders (Metadata headers) { System.out.println("[Client] Received response headers: " + headers); super .onHeaders(headers); } @Override public void onMessage (RespT message) { System.out.println("[Client] Received response message: " + message); super .onMessage(message); } @Override public void onClose (Status status, Metadata trailers) { System.out.println("[Client] Call closed with status: " + status); System.out.println("[Client] Trailers: " + trailers); super .onClose(status, trailers); } }; super .start(listener, headers); } @Override public void sendMessage (ReqT message) { System.out.println("[Client] Sending request message: " + message); super .sendMessage(message); } }; } }
Output
[Client] Calling method: com.taogen.grpc.UserService/searchUsers [Client] Sending headers: Metadata() [Client] Sending request message: ... [Client] Received response headers: Metadata(content-type=application/grpc,grpc-encoding=identity,grpc-accept-encoding=gzip) [Client] Received response message: ... [Client] Call closed with status: Status{code=OK, description=null, cause=null} [Client] Trailers: Metadata()
Server-side interceptors How to use the gRPC server-side Interceptor
1. Create a server-side interceptor that implements the ServerInterceptor interface
MyServerInterceptor.java
package com.taogen.grpc.interceptor.interceptor.service;import io.grpc.Metadata;import io.grpc.ServerCall;import io.grpc.ServerCallHandler;import io.grpc.ServerInterceptor;public class MyServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall (ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { System.out.println("Incoming call: " + call.getMethodDescriptor().getFullMethodName()); System.out.println("Headers: " + headers); return next.startCall(call, headers); } }
2. Add interceptors to the gRPC server
Server server = ServerBuilder .forPort(8080 ) .addService(new HelloServiceImpl ()) .addService(new UserServiceImpl ()) .intercept(new MyServerInterceptor ()) .build();
More advanced server-side interceptor
An example:
AroundServerInterceptor.java
package com.taogen.grpc.interceptor.interceptor.service;import io.grpc.*;public class AroundServerInterceptor implements ServerInterceptor { @Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall (ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { System.out.println("[Server] Method called: " + call.getMethodDescriptor().getFullMethodName()); System.out.println("[Server] Request headers: " + headers); ServerCall<ReqT, RespT> loggingCall = new ForwardingServerCall .SimpleForwardingServerCall<ReqT, RespT>(call) { @Override public void sendHeaders (Metadata responseHeaders) { System.out.println("[Server] Sending response headers: " + responseHeaders); super .sendHeaders(responseHeaders); } @Override public void sendMessage (RespT message) { System.out.println("[Server] Sending response message: " + message); super .sendMessage(message); } @Override public void close (Status status, Metadata trailers) { System.out.println("[Server] Call closed with status: " + status); System.out.println("[Server] Trailers: " + trailers); super .close(status, trailers); } }; ServerCall.Listener<ReqT> listener = next.startCall(loggingCall, headers); return new ForwardingServerCallListener .SimpleForwardingServerCallListener<ReqT>(listener) { @Override public void onMessage (ReqT message) { System.out.println("[Server] Received request message: " + message); super .onMessage(message); } @Override public void onHalfClose () { System.out.println("[Server] Client finished sending messages." ); super .onHalfClose(); } @Override public void onCancel () { System.out.println("[Server] Call cancelled by client." ); super .onCancel(); } @Override public void onComplete () { System.out.println("[Server] Call completed." ); super .onComplete(); } @Override public void onReady () { System.out.println("[Server] Ready to send/receive messages." ); super .onReady(); } }; } }
Output
[Server] Method called: com.taogen.grpc.UserService/searchUsers [Server] Request headers: Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.76.0,grpc-accept-encoding=gzip) [Server] Ready to send/receive messages. [Server] Received request message: ... [Server] Client finished sending messages. [Server] Sending response headers: Metadata() [Server] Sending response message: ... [Server] Call closed with status: Status{code=OK, description=null, cause=null} [Server] Trailers: Metadata() [Server] Call completed.
Deadlines A deadline is expressed in absolute time from the beginning of a request (even if the API presents it as a duration offset) and applied across multiple service invocations. The application that initiates the request sets the deadline and the entire request chain needs to respond by the deadline. gRPC APIs support using deadlines for your RPC.
A deadline tells the server how long the client is willing to wait. If the server does not respond before the deadline expires, the client cancels the RPC automatically, and the server receives a cancellation signal.
For many reasons, it is always good practice to use deadlines in your gRPC applications. gRPC communication happens over the network, so there can be delays between the RPC calls and responses. Also, in certain cases the gRPC service itself can take more time to respond depending on the service’s business logic. When client applications are developed without using deadlines, they infinitely wait for a response for RPC requests that are initiated and resources will be held for all in-flight requests. This puts the service as well as the client at risk of running out of resources, increasing the latency of the service; this could even crash the entire gRPC service.
How to set Deadlines in the client
HelloRequest request = HelloRequest.newBuilder() .setName("Alice" ) .build(); HelloReply response = blockingStub .withDeadlineAfter(3 , TimeUnit.SECONDS) .sayHello(request);
or
HelloReply response = blockingStub .withDeadline(Deadline.after(3 , TimeUnit.SECONDS)) .sayHello(request);
Handling Deadline Exceeded
On the client:
try { stub.withDeadlineAfter(1 , TimeUnit.SECONDS).sayHello(request); } catch (StatusRuntimeException e) { if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) { System.out.println("Deadline exceeded!" ); } }
On the server, you can detect cancellation:
@Override public void sayHello (HelloRequest req, StreamObserver<HelloReply> responseObserver) { Context ctx = Context.current(); ctx.addListener(context -> { if (context.isCancelled()) { System.out.println("Request was cancelled due to deadline exceeded." ); } }, MoreExecutors.directExecutor()); try { Thread.sleep(5000 ); } catch (InterruptedException e) {} if (ctx.isCancelled()) return ; responseObserver.onNext(HelloReply.newBuilder().setMessage("Hi!" ).build()); responseObserver.onCompleted(); }
Configure Default Deadlines
1. Create a client interceptor to set a default deadline
package com.taogen.grpc.advanced.deadlines;import io.grpc.*;import java.util.concurrent.TimeUnit;public class DefaultDeadlineInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall (MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { CallOptions updated = callOptions.withDeadlineAfter(3 , TimeUnit.SECONDS); return next.newCall(method, updated); } }
2. Add the DefaultDeadlineInterceptor to the client channel
channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .intercept(new DefaultDeadlineInterceptor ()) .build();
More articles about gRPC
References [1] gRPC: Up and Running by Kasun Indrasiri and Danesh Kuruppu
[2] gRPC Guide - Docs