在实际应用中,却经常会有客户端建立连接后,等待服务端数据的长连接模式,也可以称为双向连接。
一、双连接,服务端与客户端都开ThriftServer
如果网络环境可控,可以让服务端与客户端互相访问,你可以给服务端与客户端,两者都开一个ThriftServer,也就是两者互为服务端与客户端。这样就可以简单实现互相访问,比如:
客户端: <------------------->  服务端:
ThriftClient --------------> ThriftService
ThriftService <------------- ThriftClient

二、单连接,利用ProcessorFactory中TConnectionInfo的transport定时向客户端发送消息,让thrift保持长连接不立即关闭。
thrift是rpc结构的通信框架,rpc结构默认是 【客户端请求 -> 服务端回应 -> 连接断开】 的这种短连接形式,因此rpc默认是没有服务端回调功能,自然也没有长连接。
如果要保持连接不关闭且被动接收到对方的数据,需要指定双方连接的service必须为oneway,服务端定时向客户端发送信息(利用客户端发送数据到服务端时连接成功时产生的transport,需客户端也创建服务Processor),同时客户端实时检测transport的状态,以便出现与服务端连接断开的情况出现。具体流程:
1、双向连接的service必须为oneway,否则会因为recv函数抛出remote close异常。
2、客户端重用建立client的protocol,开线程使用processor.Process(protocol,protocol)监听服务端回调发送过来的消息。
3、服务端Processor的创建,使用ProcessorFactory创建Processor,通过getProcessor函数中transport作为向客户端发送消息的client的transport而创建一个Processor。

java实例

定义test.thrift

namespace java com.zychen.thrift
service ClientHandshakeService{oneway void HandShake();
}service ServerCallbackService{oneway void Push(1: string msg);
}

生成接口代码

把thrift-0.9.3.exe和test.thrift文件放在同一个目录。

进入DOS命令执行:thrift-0.9.3.exe --gen java test.thrift

生成文件gen-java/ com/zychen/thrift/Test.java

服务端代码

ClientHandshakeServiceHandler.java

package com.zychen.thrift;import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransport;public class ClientHandshakeServiceHandler implements ClientHandshakeService.Iface {public ClientHandshakeServiceHandler(TTransport trans){client = new ServerCallbackService.Client(new TBinaryProtocol(trans));}@Overridepublic void HandShake() throws TException {System.out.println("HandShake\n");StartThread();}//开始线程public void StartThread(){if(threadCallback == null){stopThread = false;threadCallback = new Thread(new CallbackThread());threadCallback.start();}}//停止线程public void StopThread(){stopThread = true;if(threadCallback != null){try {threadCallback.join();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}threadCallback = null;}}@Overrideprotected void finalize() throws Throwable {// TODO Auto-generated method stubStopThread();super.finalize();}protected ServerCallbackService.Client client;protected boolean stopThread = false;protected Thread  threadCallback = null;class CallbackThread implements Runnable {public void run() {while(true){if(stopThread){break;}try {client.Push("aaaaaaa");Thread.sleep(50);} catch (TException | InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();return;}    }}};
}

ProcessorFactoryImpl.java

package com.zychen.thrift;import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.transport.TTransport;import com.zychen.thrift.ClientHandshakeService.Processor;public class ProcessorFactoryImpl extends TProcessorFactory {public ProcessorFactoryImpl(TProcessor processor) {super(processor);// TODO Auto-generated constructor stub}@Overridepublic TProcessor getProcessor(TTransport trans) {// TODO Auto-generated method stub//return super.getProcessor(trans);return new ClientHandshakeService.Processor(new ClientHandshakeServiceHandler(trans));}
}

ServerTest.java

package com.zychen.thrift;import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadPoolServer.Args;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;import com.zychen.thrift.ClientHandshakeService.Processor;public class ServerTest {/*** @param args*/public static void main(String[] args) {TServerSocket tServerSocket;try {tServerSocket = new TServerSocket(9999);TThreadPoolServer.Args targs = new TThreadPoolServer.Args(tServerSocket);TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();//获取processFactoryTProcessorFactory tProcessorFactory = new ProcessorFactoryImpl(null);targs.protocolFactory(factory);targs.processorFactory(tProcessorFactory);TThreadPoolServer tThreadPoolServer = new TThreadPoolServer(targs); System.out.println("start server...");tThreadPoolServer.serve();} catch (TTransportException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

客户端代码

ServerCallbackServiceImpl.java

package com.zychen.thrift;import java.io.IOException;import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;public class ServerCallbackServiceImpl implements ServerCallbackService.Iface{public ServerCallbackServiceImpl(TSocket socket){this.socket = socket;}@Overridepublic void Push(String msg) throws TException {// TODO Auto-generated method stubString str = String.format("receive msg %d: %s", nMsgCount++, msg);System.out.println(str);}public void process(){processor = new ServerCallbackService.Processor<ServerCallbackService.Iface>(this);TBinaryProtocol protocol = new TBinaryProtocol(socket);while (true){try{//TProcessor,负责调用用户定义的服务接口,从一个接口读入数据,写入一个输出接口while (processor.process(protocol, protocol)){//阻塞式方法,不需要内容System.out.println("走阻塞式方法");//关闭socket//socket.close();}//connection lost, returnreturn;}catch (TException e){System.out.println("连接已断开...");e.printStackTrace();return;}}}protected int nMsgCount = 0;protected TSocket socket;protected TProcessor processor;
}

ClientTest.java

package com.zychen.thrift;import java.io.IOException;import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;import com.zychen.thrift.ServerCallbackService.Iface;public class ClientTest {/*** @param args*/public static void main(String[] args) {// TODO Auto-generated method stubTSocket tSocket = new TSocket("localhost",9999);ClientHandshakeService.Client client = new ClientHandshakeService.Client(new TBinaryProtocol(tSocket));try {tSocket.open();runMethod(tSocket);//向服务端发送消息for (int i = 0; i < 100; ++i){client.HandShake();Thread.sleep(50);}System.in.read();tSocket.close();} catch (TTransportException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (TException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static void runMethod(final TSocket tSocket){Thread thread = new Thread(new Runnable(){ServerCallbackServiceImpl serverCallbackServiceImpl = new ServerCallbackServiceImpl(tSocket);@Overridepublic void run() {// TODO Auto-generated method stubserverCallbackServiceImpl.process();}});thread.start();};
}

完整代码:下载
参考资料:
http://www.cnblogs.com/xiaosuiba/p/4122459.html
http://blog.csdn.net/qq_27989757/article/details/50761051

Thrift之双向通讯相关推荐

  1. 我的WCF之旅 (11): 再谈WCF的双向通讯-基于Http的双向通讯 V.S. 基于TCP的双向通讯...

    在一个基于面向服务的分布式环境中,借助一个标准的.平台无关的Communication Infrastructure,各个Service通过SOAP Message实现相互之间的交互.这个交互的过程实 ...

  2. WCF简单教程(6) 单向与双向通讯

    第六篇:单向与双向通讯 项目开发中我们时常会遇到需要异步调用的问题,有时忽略服务端的返回值,有时希望服务端在需要的时候回调,今天就来看看在WCF中如何实现. 先看不需要服务端返回值的单向调用,老规矩, ...

  3. C++的MFC 与 HTML 双向通讯

    C++中嵌入ie浏览器总结(1) - ie边框 及上下文菜单 最近项目中用html 来做界面,也就折腾了一下在wxwidget中嵌入浏览器的若干细节工作,mfc也基本是类似的,由于wxwidget中已 ...

  4. USART HMI智能串口屏与单片机双向通讯

    目录 ·HMI串口屏介绍 ·HMI串口屏开发实操 ·准备 ·界面认识 ·写命令 ·下载 ·HMI串口屏与单片机(stm32)双向通信 [ 功能实现: STM32控制串口屏对应的数值.文本完成相应变化 ...

  5. RS232实现串口双向通讯

    之前对接了一个pos机设备,设备使用了RS232进行通讯,现在也完成该需求并在线上运行使用当中了,此次对接实现的功能主要是应用程序向POS机设备发送应收金额和收费时间:至此想记录一下开发过程. 一.首 ...

  6. 智能工厂的IGT数据采集网关-PLC与数据库双向通讯的多种SQL语句配置

    IGT-DSER智能网关模块,支持各种PLC.智能仪表.远程IO与数据库之间双向通讯,既可以读取设备的数据上报到SQL数据库,也可以从数据库查询数据后写入到设备:数据库软件支持MySQL.SQLSer ...

  7. 安卓通讯之《蓝牙单片机通讯助手》②扫描设备、连接设备和双向通讯。

    前言 上篇文章我们介绍到了开发经典蓝牙和单片机通讯的过程,安卓通讯之<蓝牙单片机通讯助手>①集成工作 ,我们这里还要兼容最新的安卓6.0及以上的系统,因为从6.0以后的权限机制和以往的不一 ...

  8. 数据库网关-欧姆龙PLC与MySQL/SQLServer/PostgreSQL数据库实时双向通讯

    通过数据库智能网关IGT-DSER实现欧姆龙NJ501的PLC与服务器数据库双向通讯,网关支持MySQL.SQLServer.PostgreSQL,这里选择的是SQLServer. 首先,通过navi ...

  9. 蓝牙双向通讯【可自定义协议】SDK

    一.简介: 此文档主要是介绍蓝牙双向通讯sdk的调用方法以及蓝牙双向传输的核心功能逻辑,用户可以根据需要自定义协议进行蓝牙双向通讯 二.SDK接入步骤: 1: 增加sdk库依赖: a)libs文件夹下 ...

最新文章

  1. 宇宙条一面:十道经典面试题解析
  2. python kafka消费最新数据_python kafka消费数据库
  3. 开机流程与主要开机记录区(MBR)
  4. 【树形区间DP】加分二叉树(ssl 1033/luogu 1040)
  5. 工程勘察设计收费标准2002修订版_全过程工程咨询收费模式超全解析
  6. 居中的文字在小屏幕下后面的换行
  7. 闪光灯slave是什么意思_闪光灯实战讲解 | 室内光线+闪光灯使用配合=完美光效...
  8. 树莓派linux共享目录,树莓派局域网文件共享
  9. 智能云如何加速产业智能化?百度CTO王海峰2020全球智博会擘画蓝图
  10. Android一种常见的布局困扰
  11. yolobile 道路损坏检测实战
  12. css 动画 animate
  13. linux6.5禁用防火墙,Centos6.5,Centos7分别关闭selinux和防火墙
  14. s400x ugee 驱动_联想_ThinkPad|ThinkCentre|ThinkStation服务与驱动下载_常见问题
  15. 一、Java快速入门
  16. CSAPP-Lab03 Attack Lab 详细解析
  17. libcurl返回DNS无法解析的问题
  18. 中邮网院/邮e联下载
  19. android判断字符串是否包含下划线,android 富文本SpannableString去掉下划线
  20. 区块数据存储文件说明

热门文章

  1. CTF竞赛实战 中国菜刀与一句话木马
  2. 设计模式(七)——适配器模式
  3. 超全Redis命令总结(备忘)(建议赶紧收藏)
  4. C#如何在list中添加序号
  5. 按键精灵按键助手x86x64内存插件(安卓内存插件)
  6. 进制数的转换方法大全
  7. struct的构造函数
  8. 优化了的过关键点的光滑曲线拟合算法
  9. 喧喧发布 2.5.3 版本,主要提升系统稳定性,优化交互体验
  10. 这部史诗级大片,燃到爆!