9.3 客户端接收响应信息(异步转同步的实现)
一 总体流程
客户端接收响应消息 NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e) -->MultiMessageHandler.received(Channel channel, Object message)-->HeartbeatHandler.received(Channel channel, Object message)-->AllChannelHandler.received(Channel channel, Object message)-->ExecutorService cexecutor = getExecutorService()-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))-->ChannelEventRunnable.run()-->DecodeHandler.received(Channel channel, Object message)-->decode(Object message)-->HeaderExchangeHandler.received(Channel channel, Object message)-->handleResponse(Channel channel, Response response)-->DefaultFuture.received(channel, response)-->doReceived(Response res)//异步转同步
二 源码解析
在HeaderExchangeHandler.received(Channel channel, Object message)方法之前,与服务端接收请求消息一样,不再赘述。
HeaderExchangeHandler.received(Channel channel, Object message)
1 public void received(Channel channel, Object message) throws RemotingException { 2 ... 3 try { 4 if (message instanceof Request) { 5 ... 6 } else if (message instanceof Response) { 7 handleResponse(channel, (Response) message); 8 } else if (message instanceof String) { 9 ... 10 } else { 11 ... 12 } 13 } finally { 14 HeaderExchangeChannel.removeChannelIfDisconnected(channel); 15 } 16 } 17 18 static void handleResponse(Channel channel, Response response) throws RemotingException { 19 if (response != null && !response.isHeartbeat()) { 20 DefaultFuture.received(channel, response); 21 } 22 }
DefaultFuture.received(Channel channel, Response response)
1 private final long id; 2 private final Request request; 3 private final int timeout; 4 private volatile Response response; 5 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); 6 private final Condition done = lock.newCondition(); 7 8 public static void received(Channel channel, Response response) { 9 try { 10 DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture
11 if (future != null) { 12 future.doReceived(response); 13 } else { 14 logger.warn("The timeout response finally returned at " 15 + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 16 + ", response " + response 17 + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 18 + " -> " + channel.getRemoteAddress())); 19 } 20 } finally { 21 CHANNELS.remove(response.getId()); 22 } 23 } 24 25 private void doReceived(Response res) { 26 lock.lock(); 27 try { 28 //设置response 29 response = res; 30 if (done != null) { 31 //唤醒阻塞的线程 32 done.signal(); 33 } 34 } finally { 35 lock.unlock(); 36 } 37 if (callback != null) { 38 invokeCallback(callback); 39 } 40 }
这里比较难懂,笔者再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)
1 public ResponseFuture request(Object request, int timeout) throws RemotingException { 2 if (closed) { 3 throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); 4 } 5 // create request. 6 Request req = new Request(); 7 req.setVersion("2.0.0"); 8 req.setTwoWay(true); 9 req.setData(request); 10 DefaultFuture future = new DefaultFuture(channel, req, timeout); 11 try { 12 channel.send(req); 13 } catch (RemotingException e) { 14 future.cancel(); 15 throw e; 16 } 17 return future; 18 }
netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类,
在HeaderExchangeChannel.request(Object request, int timeout),在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。
1 -->DubboInvoker.doInvoke(final Invocation invocation) 2 //获取ExchangeClient进行消息的发送 3 -->ReferenceCountExchangeClient.request(Object request, int timeout) 4 -->HeaderExchangeClient.request(Object request, int timeout) 5 -->HeaderExchangeChannel.request(Object request, int timeout)
DubboInvoker.doInvoke(final Invocation invocation)
1 protected Result doInvoke(final Invocation invocation) throws Throwable { 2 RpcInvocation inv = (RpcInvocation) invocation; 3 final String methodName = RpcUtils.getMethodName(invocation); 4 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); 5 inv.setAttachment(Constants.VERSION_KEY, version); 6 7 ExchangeClient currentClient; 8 if (clients.length == 1) { 9 currentClient = clients[0]; 10 } else { 11 currentClient = clients[index.getAndIncrement() % clients.length]; 12 } 13 try { 14 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//是否异步 15 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//是否没有返回值 16 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 17 if (isOneway) { 18 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); 19 currentClient.send(inv, isSent); 20 RpcContext.getContext().setFuture(null); 21 return new RpcResult(); 22 } else if (isAsync) { 23 ResponseFuture future = currentClient.request(inv, timeout); 24 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 25 return new RpcResult(); 26 } else { 27 RpcContext.getContext().setFuture(null); 28 return (Result) currentClient.request(inv, timeout).get(); 29 } 30 } catch (TimeoutException e) { 31 throw new RpcException(...); 32 } catch (RemotingException e) { 33 throw new RpcException(...); 34 } 35 }
其中currentClient.request(inv, timeout)返回值是ResponseFuture,DefaultFuture是ResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()。
1 public Object get() throws RemotingException { 2 return get(timeout); 3 } 4 5 public Object get(int timeout) throws RemotingException { 6 if (timeout <= 0) { 7 timeout = Constants.DEFAULT_TIMEOUT; 8 } 9 if (!isDone()) { 10 long start = System.currentTimeMillis(); 11 lock.lock(); 12 try { 13 while (!isDone()) { 14 //Causes the current thread to wait until it is signalled or interrupted, or the specified waiting time elapses. 15 done.await(timeout, TimeUnit.MILLISECONDS); 16 if (isDone() || System.currentTimeMillis() - start > timeout) { 17 break; 18 } 19 } 20 } catch (InterruptedException e) { 21 throw new RuntimeException(e); 22 } finally { 23 lock.unlock(); 24 } 25 if (!isDone()) { 26 throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); 27 } 28 } 29 return returnFromResponse(); 30 } 31 32 public boolean isDone() { 33 return response != null; 34 }
此处我们看到当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间到时了。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived:
会先为response赋上返回值,之后执行condition的signal唤醒被阻塞的线程,get()方法就会释放锁,执行returnFromResponse(),返回值。
1 private Object returnFromResponse() throws RemotingException { 2 Response res = response; 3 if (res == null) { 4 throw new IllegalStateException("response cannot be null"); 5 } 6 if (res.getStatus() == Response.OK) { 7 return res.getResult(); 8 } 9 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { 10 throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); 11 } 12 throw new RemotingException(channel, res.getErrorMessage()); 13 }
到现在其实还有一个问题?就是netty时异步非阻塞的,那么假设现在我发了1w个Request,后来返回来1w个Response,那么怎么对应Request和Response呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request和Response中都有一个属性id。
在HeaderExchangeChannel.request(Object request, int timeout)中:
1 Request req = new Request(); 2 req.setVersion("2.0.0"); 3 req.setTwoWay(true); 4 req.setData(request); 5 DefaultFuture future = new DefaultFuture(channel, req, timeout); 6 try { 7 channel.send(req); 8 } catch (RemotingException e) { 9 future.cancel(); 10 throw e; 11 } 12 return future;
看一下Request的构造器:
1 private static final AtomicLong INVOKE_ID = new AtomicLong(0); 2 private final long mId; 3 4 public Request() { 5 mId = newId(); 6 } 7 8 private static long newId() { 9 // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID 10 return INVOKE_ID.getAndIncrement(); 11 }
看一下DefaultFuture的构造器:
1 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); 2 private final long id; 3 private final Request request; 4 private volatile Response response; 5 6 public DefaultFuture(Channel channel, Request request, int timeout) { 7 ... 8 this.request = request; 9 this.id = request.getId(); 10 ... 11 FUTURES.put(id, this); 12 ... 13 }
再来看一下响应。
HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)
1 Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { 2 Response res = new Response(req.getId(), req.getVersion()); 3 ... 4 Object msg = req.getData(); 5 try { 6 // handle data. 7 Object result = handler.reply(channel, msg); 8 res.setStatus(Response.OK); 9 res.setResult(result); 10 } catch (Throwable e) { 11 res.setStatus(Response.SERVICE_ERROR); 12 res.setErrorMessage(StringUtils.toString(e)); 13 } 14 return res; 15 }
来看一下Response的构造器:
1 private long mId = 0; 2 3 public Response(long id, String version) { 4 mId = id; 5 mVersion = version; 6 }
这里response的id的值时request的id。最后来看一下服务端接收后的处理:
DefaultFuture.received(Channel channel, Response response)
1 public static void received(Channel channel, Response response) { 2 try { 3 DefaultFuture future = FUTURES.remove(response.getId());//删除元素并返回key=response.getId()的DefaultFuture 4 if (future != null) { 5 future.doReceived(response); 6 } else { 7 ... 8 } 9 } finally { 10 CHANNELS.remove(response.getId()); 11 } 12 }
9.3 客户端接收响应信息(异步转同步的实现)相关推荐
- C#创建MQTT客户端接收服务器信息
服务端下载地址:https://download.csdn.net/download/horseroll/11012231 MQTT是什么? MQTT (Message Queue Telemetry ...
- activemq 异步和同步接收
来点实在的代码,用例子来说明: 1.异步接收,主要设置messageListener.,然后编写onmessage方法,很简单 a.客户端发送5条消息 1 package ch02.chat; 2 3 ...
- Qt网络编程——TCP服务器与客户端互发信息
前言 前一个博客,试了TCP的服务器与客户端的连接与断开,接下就是客户端与服务器互发信息. 客户端 1.往服务器发送信息 //发送消息 void Client::on_buttonSendMessag ...
- websocket多客户端接收消息_WebSocket之消息接收发送
WebSocket协议是基于TCP的一种新的网络协议.它实现了浏览器与服务器全双工(full-duplex)通信--允许服务器主动发送信息给客户端. 在 WebSocket API 中,浏览器和服务器 ...
- HTTP发送请求和接收响应的整个流程
HTTP 无状态性 HTTP 协议是无状态的(stateless).也就是说,同一个客户端第二次访问同一个服务器上的页面时,服务器无法知道这个客户端曾经访问过,服务器也无法分辨不同的客户端.HTTP ...
- 关于 客户端发现响应内容类型为“text/html; charset=utf-8”,但应为“text/xml”的解决方法...
关于 客户端发现响应内容类型为"text/html; charset=utf-8",但应为"text/xml"的解决方法 请求web服务时,会有如题的异常出现, ...
- IW会话参数、请求信息、及其响应信息
目录 IW会话参数.请求信息.及其响应信息 一.IW新会话参数 1.MS Edge浏览器: 2.Delphi FMX APP: 二.IW请求信息-App客户端 三.IW请求信息及其响应信息-App客户 ...
- Ajax的五种接收响应头消息(常用)
学习本文你得先了解php与Ajax 我的PHP初探 Ajax的了解与应用 五种响应头消息 1. textt/plain 字符串 服务端的消息响应头:header("Content-Type: ...
- Webservice报错客户端发现响应内容类型为“application/json;charset=UTF-8”,但应为“text/xml”。...
控制台对接Webservice正常,同样的方法在Web项目上报错: 客户端发现响应内容类型为"application/json;charset=UTF-8",但应为"te ...
- 客户端发现响应内容类型为“text/html;charset=utf-8”,但应为“text/xml” 解决办法
characterEncoding=utf8&{"客户端发现响应内容类型为"text/html;charset=utf-8",但应为"text/xml& ...
最新文章
- 点点看 只有想不到没有看不到
- iOS下Html页面中input获取焦点弹出键盘时挡住input解决方案—scrollIntoView()
- ORACLE 表空间SQL
- 【CodeForces - 144B 】Meeting (暴力枚举,水题,计算几何)
- 闲鱼的真正用法,其实是找对象
- 图形界限命令在命令行输入_CAD骚操作,恭喜你又学会了几个重要的命令
- 这么多年被第三方接入坑的那些事。。。关于md5签名和sha1证书的坑
- jmeter-如何进行参数化-循环读取参数
- 计算机同S7-300PLC通讯,西门子S7-300 PLC与Intouch的通讯连接方法
- 二分查找在java中的实现
- 平面设计如何才能自学会?需要掌握什么技能?
- Revit二次开发之创建共享参数及绑定共享参数【比目鱼原创】
- matlab矩阵四分位数,中位数,四分位数
- 那些散落在人间的天使
- can总线程序讲解_详解CAN总线
- phpstorm+xdebug远程调试
- S5PV210 | S5PV210上进行Linux开发
- 69、弱电综合布线网络篇基础知识
- 十一、MYSQL数据库备份还原
- 条码应用与企业ERP无缝集成