gRPC Client Middleware.
2018-08-19 13:47
330 查看
中间件想必大家不陌生,今天给大家介绍如何实现中间件以及实现gRPC的客户端中间件。
什么是中间件?
https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/middleware/index?view=aspnetcore-2.1&tabs=aspnetcore2x
中间件管道
先定义管道Pipeline
/// <summary> /// Built pipeline for gRPC /// </summary> public class Pipeline { private PipelineDelagate processChain; internal Pipeline(PipelineDelagate middlewareChain) { processChain = middlewareChain; } internal Task RunPipeline(MiddlewareContext context) { return processChain(context); } }
然后实现PipelineBuilder
/// <summary> /// PipelineBuilder /// </summary> public class PipelineBuilder { private List<Func<PipelineDelagate, PipelineDelagate>> middlewares = new List<Func<PipelineDelagate, PipelineDelagate>>(); /// <summary> /// Add a middleware /// </summary> public PipelineBuilder Use(Func<PipelineDelagate, PipelineDelagate> middleware) { middlewares.Add(middleware); return this; } /// <summary> /// Use /// </summary> public PipelineBuilder Use<T>(params object[] args) { middlewares.Add(d => WrapClass<T>(d, args)); return this; } /// <summary> /// UseWhen /// </summary> public PipelineBuilder UseWhen<T>(Func<MiddlewareContext, bool> condition, params object[] args) { middlewares.Add(d => { return async ctx => { if (condition(ctx)) { await WrapClass<T>(d, args)(ctx); } }; }); return this; } /// <summary> /// Use /// </summary> public PipelineBuilder Use(Func<MiddlewareContext, PipelineDelagate, Task> middleware) { middlewares.Add(d => { return ctx => { return middleware(ctx, d); }; }); return this; } private PipelineDelagate WrapClass<T>(PipelineDelagate next, params object[] args) { var ctorArgs = new object[args.Length + 1]; ctorArgs[0] = next; Array.Copy(args, 0, ctorArgs, 1, args.Length); var type = typeof(T); var instance = Activator.CreateInstance(type, ctorArgs); MethodInfo method = type.GetMethod("Invoke"); return (PipelineDelagate)method.CreateDelegate(typeof(PipelineDelagate), instance); } /// <summary> /// Build /// </summary> public Pipeline Build() { PipelineDelagate pipeline = ExecuteMainHandler; middlewares.Reverse(); foreach (var middleware in middlewares) { pipeline = middleware(pipeline); } return new Pipeline(pipeline); } internal static Task ExecuteMainHandler(MiddlewareContext context) { return context.HandlerExecutor(); } }
MiddlewareContext 和 委托定义
/// <summary> /// MiddlewareContext /// </summary> public class MiddlewareContext { public IMethod Method { get; set; } public object Request { get; set; } public object Response { get; set; } public string Host { get; set; } public CallOptions Options { get; set; } internal Func<Task> HandlerExecutor { get; set; } } public delegate Task PipelineDelagate(MiddlewareContext context);
到这里管道建设完成,那个如何在gRPC中使用呢?
首先实现自己的客户端拦截器MiddlewareCallInvoker
public sealed class MiddlewareCallInvoker : DefaultCallInvoker { private readonly Channel grpcChannel; private Pipeline MiddlewarePipeline { get; set; } public MiddlewareCallInvoker(Channel channel) : base(channel) { this.grpcChannel = channel; } public MiddlewareCallInvoker(Channel channel, Pipeline pipeline) : this(channel) { this.MiddlewarePipeline = pipeline; } private TResponse Call<TResponse>(Func<MiddlewareContext, TResponse> call, MiddlewareContext context) { TResponse response = default(TResponse); if (MiddlewarePipeline != null) { context.HandlerExecutor = async () => { response = await Task.FromResult(call(context)); context.Response = response; }; MiddlewarePipeline.RunPipeline(context).ConfigureAwait(false); } else { response = call(context); } return response; } public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.BlockingUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>( Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.AsyncUnaryCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request) { return Call((context) => base.AsyncServerStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options, (TRequest)context.Request), new MiddlewareContext { Host = host, Method = method, Options = options, Request = request, Response = null }); } public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) { return Call((context) => base.AsyncClientStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext { Host = host, Method = method, Options = options, Request = null, Response = null }); } public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options) { return Call((context) => base.AsyncDuplexStreamingCall((Method<TRequest, TResponse>)context.Method, context.Host, context.Options), new MiddlewareContext { Host = host, Method = method, Options = options, Request = null, Response = null }); } }
到这里基于管道的中间件基本完成。接下来就是在客户端使用了
var pipeline = new PipelineBuilder() //.Use<ExceptionMiddleware>() //.Use<TimerMiddleware>() //.Use<LoggingMiddleware>() //.Use<TimeoutMiddleware>() .Use<PolicyMiddleware>(new PolicyMiddlewareOptions { RetryTimes = 2, TimoutMilliseconds = 100 }) ; //pipeline.Use<LoggingMiddleware>();// pipeline.UseWhen<LoggingMiddleware>(ctx => ctx.Context.Method.EndsWith("SayHello")); //pipeline.Use<TimeoutMiddleware>(new TimeoutMiddlewareOptions { TimoutMilliseconds = 1000 }); //console logger pipeline.Use(async (ctx, next) => { Console.WriteLine(ctx.Request.ToString()); await next(ctx); Console.WriteLine(ctx.Response.ToString()); }); Channel channel = new Channel("127.0.0.1:50051", ChannelCredentials.Insecure); MiddlewareCallInvoker callInvoker = new MiddlewareCallInvoker(channel, pipeline.Build()); var clientApi = new Get.GetClient(callInvoker); var replyApi = clientApi.SayHello(new ContractsSample1.HelloApiDemo.HelloRequest { Name = " APII" }); Console.WriteLine("Greeting: " + replyApi.Message);
Middleware
public class LoggingMiddleware { private PipelineDelagate _next; public LoggingMiddleware(PipelineDelagate next) { _next = next; } public async Task Invoke(MiddlewareContext context) { //Before await _next(context); //End } }
至此已完成所有工作。希望能够帮助到大家。
如果有机会将给大家介绍gRPC跨平台网关,该网关能够实现自动将下游的gRPC服务转化成http api,只需要你的proto文件即可。如果你使用consul,甚至无需你做任务配置。
相关文章推荐
- rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理
- spark on yarn(ERROR client.TransportClient: Failed to send RPC)
- RPCServiceClient-调用webservice客户端
- [RPC Fault faultString="Send failed" faultCode="Client.Error.MessageSend" faultDetail="Channel.Connect.Failed error NetC
- 带有HA功能的Hadoop Client端RPC实现原理与代码分析
- org.apache.flume.FlumeException: NettyAvroRpcClient { host: xxx.xxx.xxx.xxx, port: 41100 }: RPC
- Using Async RPC with Your Client/Server Applications
- [RPC Fault faultString="Send failed" faultCode="Client.Error.MessageSend"
- 关于axis2的RPCServiceClient客户端无法传参问题的解决方案
- webservice之RPCClient 远程调用服务
- RPC-client异步收发核心细节?
- 分布式文件系统KFS源码阅读与分析(四):RPC实现机制(KfsClient端)
- Hadoop源码解析之 rpc通信 client到server通信
- Flex应用BlazeDS时报错:[RPC Fault faultString="发送失败" faultCode="Client.Error.Messag
- pomelo研究笔记-RPCclient
- printer configure with rpcclient
- Hadoop RPC源码解析——Client类
- 用于发送 RPC 请求的RpcClient
- [FaultEvent fault=[RPC Fault faultString="发送失败" faultCode="Client.Error.MessageSend
- GWT com.google.gwt.user.client.rpc.StatusCodeException: 404 <html>