自己写的grpc简单连接池,基于common pool2
17年的时候写的证券的项目,当时交易端是另外一批同事开发的,他们强烈要求用grpc,当时这个东西还不那么成熟,在网上也搜索不到比较完美的第三方的连接池搭配使用,索性就自己写了一个,因为之前thrift也自己写过类似的连接池,所以也不算太麻烦,之前thrift的连接池是纯手工写的,没有用第三方的连接池库,使用的时候排过几次雷,这次不想那么麻烦,所以直接就是用commons-pool2的连接池库。好了,废话不多说,直接贴代码。
第一个类GrpcSocketPool是基础的连接池类,主要是初始化连接池,配置一些常见的连接池参数,提供获取和归还连接的方法。
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;import io.grpc.ManagedChannel;public class GrpcSocketPool {private GenericObjectPool<SocketPoolBean<ManagedChannel>> objectPool = null;private String ip;private Integer port;private Integer maxTotal;private Integer minIdle;private Integer maxIdle;private Integer minEvictableIdleTimeMillis;private String channelName;public void init() {// 连接池的配置GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();// 池中的最大连接数poolConfig.setMaxTotal(maxTotal);// 最少的空闲连接数poolConfig.setMinIdle(minIdle);// 最多的空闲连接数poolConfig.setMaxIdle(maxIdle);// 当连接池资源耗尽时,调用者最大阻塞的时间,超时时抛出异常 单位:毫秒数poolConfig.setMaxWaitMillis(10000);// 连接池存放池化对象方式,true放在空闲队列最前面,false放在空闲队列最后poolConfig.setLifo(true);// 连接空闲的最小时间,达到此值后空闲连接可能会被移除,默认即为30分钟poolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);// 连接耗尽时是否阻塞,默认为truepoolConfig.setBlockWhenExhausted(true);poolConfig.setTestOnReturn(true);// 连接池创建objectPool = new GenericObjectPool<SocketPoolBean<ManagedChannel>>(new GrpcSocketFactory(ip, port), poolConfig);}/*** 从连接池获取对象* @throws TradeServerException */public SocketPoolBean<ManagedChannel> borrowObject() throws TradeServerException {try {SocketPoolBean<ManagedChannel> bean = objectPool.borrowObject();return bean;} catch (Exception e) {throw new TradeServerException(ExceptionEnu.CHANNEL_ERROR);}}public void returnObject(SocketPoolBean<ManagedChannel> bean){objectPool.returnObject(bean);}public void destory(){ip = null; port = null; maxTotal = null; minIdle = null; maxIdle = null; minEvictableIdleTimeMillis = null;channelName = null; objectPool.close();}public String getChannelName() {return channelName;}public void setChannelName(String channelName) {this.channelName = channelName;}@Overridepublic String toString() {return new StringBuilder("ip:").append(ip).append(",port:").append(port).toString();}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public Integer getMaxTotal() {return maxTotal;}public void setMaxTotal(Integer maxTotal) {this.maxTotal = maxTotal;}public Integer getMinIdle() {return minIdle;}public void setMinIdle(Integer minIdle) {this.minIdle = minIdle;}public Integer getMaxIdle() {return maxIdle;}public void setMaxIdle(Integer maxIdle) {this.maxIdle = maxIdle;}public Integer getMinEvictableIdleTimeMillis() {return minEvictableIdleTimeMillis;}public void setMinEvictableIdleTimeMillis(Integer minEvictableIdleTimeMillis) {this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;}}
第二个类SocketPoolBean是连接池里存的连接对象,这里我进行了封装,因为grpc提供的StatusRuntimeException里封装的有socket的状态码,根据状态码可以知道socket是否断开,断开需要让连接池关闭连接,所以加了个是否需要销毁的状态。
public class SocketPoolBean<T> {private T bean;private Boolean isDestory = false;public T getBean() {return bean;}public void setBean(T bean) {this.bean = bean;}public Boolean getIsDestory() {return isDestory;}public void setIsDestory(Boolean isDestory) {this.isDestory = isDestory;}}
第三个类GrpcSocketFactory,是连接池生成连接的工厂类,里面包含生成、销毁、校验连接等方法,从连接池获取或者归还连接时会调用工厂内的方法。
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;public class GrpcSocketFactory extends BasePooledObjectFactory<SocketPoolBean<ManagedChannel>> {private String ip;private int port;public GrpcSocketFactory(String ip, int port) {super();this.ip = ip;this.port = port;}@Overridepublic SocketPoolBean<ManagedChannel> create() throws Exception {ManagedChannel channel = NettyChannelBuilder.forAddress(ip, port).usePlaintext(true).build();SocketPoolBean<ManagedChannel> bean = new SocketPoolBean<ManagedChannel>();bean.setBean(channel);return bean;}@Overridepublic PooledObject<SocketPoolBean<ManagedChannel>> wrap(SocketPoolBean<ManagedChannel> bean) {return new DefaultPooledObject<SocketPoolBean<ManagedChannel>>(bean);}@Overridepublic void destroyObject(PooledObject<SocketPoolBean<ManagedChannel>> bean) throws Exception {
// p.getObject().shutdownNow();super.destroyObject(bean);}@Overridepublic boolean validateObject(PooledObject<SocketPoolBean<ManagedChannel>> p) {if(p.getObject().getIsDestory()){return false;}return super.validateObject(p);}}
第四个类是基于aop的回收连接类,使用的是aspectj,主要是针对使用到连接的地方做一个统一回收,复制代码的时候下面的切入点需要配置正确的包名。
import java.util.Map;
import java.util.Map.Entry;import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;@Component
@Aspect
public class GrpcReturnObjectAop {@Autowiredprivate GrpcChannelFactory grpcChannelFactory;//配置切入点,无具体方法体、参数和返回值@Pointcut("execution(* com.xx.xx.service.*Service*.*(..))") private void aspectjMethod(){};//正常返回@AfterReturning(value="aspectjMethod()")public void after(JoinPoint point){returnObject(false);} //异常返回@AfterThrowing(value="aspectjMethod()", throwing = "ex")public void after(JoinPoint point, Exception ex){ex.printStackTrace();if (ex instanceof StatusRuntimeException) {if(Code.UNAVAILABLE.equals(((StatusRuntimeException)ex).getStatus())){returnObject(true);} else {returnObject(false);}} else {returnObject(false);}}//回收所有使用到的socketprivate void returnObject(Boolean isDestory){ThreadLocal<Map<String, SocketPoolBean<ManagedChannel>>> tl = grpcChannelFactory.getThreadGetChannel();if (tl.get() != null) {for (Entry<String, SocketPoolBean<ManagedChannel>> en : tl.get().entrySet()) {if (isDestory) {en.getValue().setIsDestory(true);}grpcChannelFactory.returnObject(en.getKey(), en.getValue());}tl.get().clear();tl.remove();}}
}
第五个类GrpcChannelFactory是从连接池里获取socket的类,支持多个socket连接的连接池借用和归还。ORDER_S 、ORDER_C、ORDER_E,这几个参数是区分不同连接的连接池的,和下面spring配置的channelName对应 。
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;import org.slf4j.Logger;import io.grpc.ManagedChannel;public class GrpcChannelFactory {private static final Logger log = Log4jManager.get();public static final String ORDER_S = "orderTrade";public static final String ORDER_C = "orderCancel";public static final String ORDER_E = "orderEnable";private ThreadLocal<Map<String, SocketPoolBean<ManagedChannel>>> threadGetChannel = new ThreadLocal<Map<String,SocketPoolBean<ManagedChannel>>>();private Map<String, GrpcSocketPool> poolMap;private List<GrpcSocketPool> socketPoolList;//初始化public void init() {try {log.info("GrpcChannelFactory初始化...");if (socketPoolList != null) {poolMap = new HashMap<String, GrpcSocketPool>();GrpcSocketPool pool = null;for (int i = 0; i < socketPoolList.size(); i++) {pool = socketPoolList.get(i);poolMap.put(pool.getChannelName(), pool);}}log.info("GrpcChannelFactory初始化成功!,[{}]", poolMap);} catch (Exception e) {log.error("GrpcChannelFactory初始化失败", e);}}//获取socketpublic ManagedChannel getSocket(String channelName) throws TradeServerException{Map<String, SocketPoolBean<ManagedChannel>> channelMap = threadGetChannel.get();SocketPoolBean<ManagedChannel> bean = null;if (channelMap == null) {channelMap = new HashMap<String, SocketPoolBean<ManagedChannel>>();threadGetChannel.set(channelMap);bean = poolMap.get(channelName).borrowObject();channelMap.put(channelName, bean);return bean.getBean();}bean = channelMap.get(channelName);if (bean == null) {bean = poolMap.get(channelName).borrowObject();channelMap.put(channelName, bean);}return bean.getBean();}//回收socketpublic void returnObject(String channelName, SocketPoolBean<ManagedChannel> channel){poolMap.get(channelName).returnObject(channel);}//销毁public void destory(){for (GrpcSocketPool pool : socketPoolList) {pool.destory();}socketPoolList.clear();poolMap.clear();}public ThreadLocal<Map<String, SocketPoolBean<ManagedChannel>>> getThreadGetChannel(){return threadGetChannel;}public void setSocketPoolList(List<GrpcSocketPool> socketPoolList) {this.socketPoolList = socketPoolList;}}
spring配置,配置了三个连接池,每个连接池的连接对应不同的业务。
<bean id="grpcChannelFactory" class="com.xxx.frame.grpc.GrpcChannelFactory" init-method="init" destroy-method="destory"><property name="socketPoolList"><list><bean class="com.lion.frame.grpc.GrpcSocketPool" init-method="init"><property name="ip" value="127.0.0.1"></property><property name="port" value="50053"></property><property name="maxTotal" value="30"></property><property name="minIdle" value="5"></property><property name="maxIdle" value="10"></property><property name="minEvictableIdleTimeMillis" value="1800000"></property><property name="channelName" value="orderTrade"></property></bean><bean class="com.lion.frame.grpc.GrpcSocketPool" init-method="init"><property name="ip" value="127.0.0.1"></property><property name="port" value="50057"></property><property name="maxTotal" value="30"></property><property name="minIdle" value="5"></property><property name="maxIdle" value="10"></property><property name="minEvictableIdleTimeMillis" value="1800000"></property><property name="channelName" value="orderCancel"></property></bean><bean class="com.lion.frame.grpc.GrpcSocketPool" init-method="init"><property name="ip" value="127.0.0.1"></property><property name="port" value="50058"></property><property name="maxTotal" value="10"></property><property name="minIdle" value="5"></property><property name="maxIdle" value="10"></property><property name="minEvictableIdleTimeMillis" value="1800000"></property><property name="channelName" value="orderEnable"></property></bean></list></property></bean>
在想要调用的实例里注入类。
//注入类@Autowiredprivate GrpcChannelFactory grpcChannelFactory;
完整的调用方式,方法执行完后上面的aop类会回收使用到的socket,所以上面的aop切入点务必配置正确
SvrOrderCancelBlockingStub stub = SvrOrderCancelGrpc.newBlockingStub(grpcChannelFactory.getSocket(GrpcChannelFactory.ORDER_C));OrderCancelReq req = OrderCancelReq.newBuilder().setIorderno(orderNo).build();OrderCancelResp resp = stub.orderCancel(req);
以上大概是连接池的完整代码,有不懂得可以私信或者留言,不过我不经常上csdn,写文章只是为了记录。
自己写的grpc简单连接池,基于common pool2相关推荐
- mysql c api简单连接池
连接池为了解决频繁的创建.销毁所带来的系统开销. 简而言之,就是 自己先创建一定量的连接,然后在需要的时候取出一条连接使用. 当然如果你只有一个线程连接数据库,而且不是实时返回结果,那么你完全不必用连 ...
- 360mysql连接池_自己动手写个数据库连接池
说到数据库连接池也是初学者会望而却步,认为是如何高深莫测的东西,其实可以用一句话来解释: 连接池的出现是为了用户频繁访问数据库而造成速度和性能上的迟缓才对访问数据库的方法作了一点修改,这个修改就是把原 ...
- 走进JavaWeb技术世界3:JDBC的进化与连接池技术
网络访问 随着 Oracle, Sybase, SQL Server ,DB2, Mysql 等人陆陆续续住进数据库村, 这里呈现出一片兴旺发达的景象, 无数的程序在村里忙忙碌碌, 读写数据库, ...
- easyswoole数据库连接池_easyswoole redis连接池:集群迁移教程
场景 在业务量小的情况下,我们使用Redis单机连接池就可以满足业务需求.因此,redis单机连接池就可以满足我们的业务.因此我们会这样写: 示例 注册连接池 use EasySwoole\Redis ...
- 如何实现Tomcat连接池数据库密码加密
问题背景: 快逸报表应用在tomcat应用服务器进行部署时,如果需要调用tomcat配置好的数据库连接池,就不得不把报表数据源连接的密码以明文形式暴露,这样数据库连接的用户名密码都非常容易被获取,是非 ...
- 正确地利用Netty建立连接池
为什么80%的码农都做不了架构师?>>> 一.问题描述 Netty是最近非常流行的高性能异步通讯框架,相对于Java原生的NIO接口,Netty封装后的异步通讯机制要简单很多. ...
- swoole mysql 协程_swoole-orm: 基于swoole的mysql协程连接池,简单封装。实现多个协程间共用同一个协程客户端。参考thinkphp-orm...
swoole-orm 基于swoole的mysql协程连接池,简单封装. 实现多个协程间共用同一个协程客户端 感谢完善 [1]:nowbe -> 新增数据返回insert_id 版本 v0.0. ...
- druid连接池初始化慢_从零开始手写 mybatis (三)jdbc pool 从零实现数据库连接池
前景回顾 第一节 从零开始手写 mybatis(一)MVP 版本 中我们实现了一个最基本的可以运行的 mybatis. 第二节 从零开始手写 mybatis(二)mybatis interceptor ...
- DRUID连接池的简单使用
DRUID--为监控而生的DB池 1. DRUID介绍 DRUID是阿里巴巴开源平台上一个数据库连接池实现,它结合了C3P0.DBCP.PROXOOL等DB池的优点,同时加入了日志监控,可以很好的监 ...
最新文章
- linux a文本编辑大全,Linux sed 命令 - Linux文档编辑命令大全
- 计算机丢失qt4core.dll,qtcore4.;dll文件丢失。怎么办?
- yum 安装mariadb
- 将数据传入重定向网页
- (ECC)椭圆曲线加密算法原理和C++实现源码
- 电商优秀设计作品展示
- 数据结构与算法 完整版单链表(附GIF)
- 某IT公司的面试题,难度系数“爆表”。。。
- 带大家一起感受美国两日游
- 2017普及第四题 跳房子 jump DP+二分
- 2023.02.07草图大师SU模型渲染- 效果图之加深颜色的色调
- 阿里云生态峰会实录(中)
- AWS DynamoDB基础使用
- DETR系列大盘点 | 端到端Transformer目标检测算法汇总!
- 解决mysql自动重连
- python如何实现找图_python实现图片筛选程序
- Linux 内核原子操作
- 重点!!!计算虚拟化技术(HCIE云方向)
- 计算机毕业论文java毕业设计论文题目s2sh+mysql实现的校园实习兼职系统|招聘兼职求职[包运行成功]
- 一个项目经理的个人体会:最忌讳的就是完美主义倾向
热门文章
- 2017 Github优秀开源项目整理
- 机器学习的数学基础(2):赋范空间、内积空间、完备空间与希尔伯特空间
- 海盗比酒量--蓝桥杯
- Python u,b,r前缀的作用及应用
- 测绘专硕要学计算机吗,测绘工程专硕专业介绍_测绘工程非全日制研究生(专业硕士)_125在职研究生...
- meta标签(以京东首页为例)
- 用APICloud开发仿微信聊天App制作经验分享
- python文本文件对比_Python-文件差异对比
- Android之FileProvider详解
- EOS.CYBEX社区满分入围EOS超级节点候选人!