本章将分析motan的序列化和底层通信相关部分的代码。

1.在上一章中,有一个getrefers的操作,来获取所有服务器的引用,每个服务器的引用都是由DefaultRpcReferer来创建的

        public DefaultRpcReferer(Class<T> clz, URL url, URL serviceUrl) {super(clz, url, serviceUrl);endpointFactory =ExtensionLoader.getExtensionLoader(EndpointFactory.class).getExtension(url.getParameter(URLParamType.endpointFactory.getName(), URLParamType.endpointFactory.getValue()));//通过spi加载NettyEndpointFactoryclient = endpointFactory.createClient(url);//创建client}

2.NettyClient的创建过程及源码分析

    public Client createClient(URL url) {LoggerUtil.info(this.getClass().getSimpleName() + " create client: url={}", url);return createClient(url, heartbeatClientEndpointManager);//创建client}private Client createClient(URL url, EndpointManager endpointManager) {Client client = innerCreateClient(url);//调用NettyEndpointFactory的创建client的方法endpointManager.addEndpoint(client);//添加心跳管理return client;}protected Client innerCreateClient(URL url) {return new NettyClient(url);//返回NettyClient对象}public NettyClient(URL url) {super(url);maxClientConnection = url.getIntParameter(URLParamType.maxClientConnection.getName(),URLParamType.maxClientConnection.getIntValue());timeMonitorFuture = scheduledExecutor.scheduleWithFixedDelay(new TimeoutMonitor("timeout_monitor_" + url.getHost() + "_" + url.getPort()),MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD, MotanConstants.NETTY_TIMEOUT_TIMER_PERIOD,TimeUnit.MILLISECONDS);LoggerUtil.info("client's:"+url.getUri());}

3.Netty相关的连接建立是通过open()方法进行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public synchronized boolean open() {
    if (isAvailable()) {
        return true;
    }
    // 初始化netty client bootstrap
    initClientBootstrap();
    // 初始化连接池
    initPool();
    LoggerUtil.info("NettyClient finish Open: url={}", url);
    // 注册统计回调
    StatsUtil.registryStatisticCallback(this);
    // 设置可用状态
    state = ChannelState.ALIVE;
    return state.isAliveState();
}
private void initClientBootstrap() {
    bootstrap = new ClientBootstrap(channelFactory);
     
    bootstrap.setOption("keepAlive"true);
    bootstrap.setOption("tcpNoDelay"true);
    // 实际上,极端情况下,connectTimeout会达到500ms,因为netty nio的实现中,是依赖BossThread来控制超时,
    // 如果为了严格意义的timeout,那么需要应用端进行控制。
    int timeout = getUrl().getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
    if (timeout <= 0) {
        throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    }
    bootstrap.setOption("connectTimeoutMillis", timeout);
    // 最大响应包限制
    final int maxContentLength = url.getIntParameter(URLParamType.maxContentLength.getName(),
            URLParamType.maxContentLength.getIntValue());
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder"new NettyDecoder(codec, NettyClient.this, maxContentLength));//解码器
            pipeline.addLast("encoder"new NettyEncoder(codec, NettyClient.this));//编码器
            pipeline.addLast("handler"new NettyChannelHandler(NettyClient.thisnew MessageHandler() {//业务处理的handler
                @Override
                public Object handle(Channel channel, Object message) {
                    Response response = (Response) message;
                    NettyResponseFuture responseFuture = NettyClient.this.removeCallback(response.getRequestId());//移调异步处理response信息
                    if (responseFuture == null) {
                        LoggerUtil.warn(
                                "NettyClient has response from server, but resonseFuture not exist,  requestId={}",
                                response.getRequestId());
                        return null;
                    }
                    if (response.getException() != null) {
                        responseFuture.onFailure(response);
                    else {
                        responseFuture.onSuccess(response);
                    }
                    return null;
                }
            }));
            return pipeline;
        }
    });
}

4.连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected void initPool() {
    poolConfig = new GenericObjectPool.Config();//使用了GenericObjectPool作为连接池
    poolConfig.minIdle =
            url.getIntParameter(URLParamType.minClientConnection.getName(), URLParamType.minClientConnection.getIntValue());//最小连接数,配置中为2个
    poolConfig.maxIdle =
            url.getIntParameter(URLParamType.maxClientConnection.getName(), URLParamType.maxClientConnection.getIntValue());//最大连接数,配置中为10个
    poolConfig.maxActive = poolConfig.maxIdle;
    poolConfig.maxWait = url.getIntParameter(URLParamType.requestTimeout.getName(), URLParamType.requestTimeout.getIntValue());
    poolConfig.lifo = url.getBooleanParameter(URLParamType.poolLifo.getName(), URLParamType.poolLifo.getBooleanValue());
    poolConfig.minEvictableIdleTimeMillis = defaultMinEvictableIdleTimeMillis;
    poolConfig.softMinEvictableIdleTimeMillis = defaultSoftMinEvictableIdleTimeMillis;
    poolConfig.timeBetweenEvictionRunsMillis = defaultTimeBetweenEvictionRunsMillis;
    factory = createChannelFactory();//创建chanalfactory
    pool = new GenericObjectPool(factory, poolConfig);
    boolean lazyInit = url.getBooleanParameter(URLParamType.lazyInit.getName(), URLParamType.lazyInit.getBooleanValue());
    if (!lazyInit) {
        for (int i = 0; i < poolConfig.minIdle; i++) {//初始化2个长连接
            try {
                pool.addObject();
                LoggerUtil.info("init client's connection :"+i);
            catch (Exception e) {
                LoggerUtil.error("NettyClient init pool create connect Error: url=" + url.getUri(), e);
            }
        }
    }
}

5.NettyChannelFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class NettyChannelFactory extends BasePoolableObjectFactory {
    private String factoryName = "";
    private NettyClient nettyClient;
    public NettyChannelFactory(NettyClient nettyClient) {
        super();
        this.nettyClient = nettyClient;
        this.factoryName = "NettyChannelFactory_" + nettyClient.getUrl().getHost() + "_"
                + nettyClient.getUrl().getPort();
    }
    @Override
    public Object makeObject() throws Exception {//创建连接时会调用
        NettyChannel nettyChannel = new NettyChannel(nettyClient);//创建channel
        nettyChannel.open();//打开channel
        return nettyChannel;
    }
}

6.NettyChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NettyChannel implements com.weibo.api.motan.transport.Channel {
    private volatile ChannelState state = ChannelState.UNINIT;
    private NettyClient nettyClient;
    private org.jboss.netty.channel.Channel channel = null;
    private InetSocketAddress remoteAddress = null;
    private InetSocketAddress localAddress = null;
    public NettyChannel(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
        this.remoteAddress = new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort());//服务器host和port
    }
    public synchronized boolean open() {//打开连接
        if (isAvailable()) {
            LoggerUtil.warn("the channel already open, local: " + localAddress + " remote: " + remoteAddress + " url: "
                    + nettyClient.getUrl().getUri());
            return true;
        }
        try {
            ChannelFuture channleFuture = nettyClient.getBootstrap().connect(
                    new InetSocketAddress(nettyClient.getUrl().getHost(), nettyClient.getUrl().getPort()));//打开连接
            long start = System.currentTimeMillis();
            int timeout = nettyClient.getUrl().getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue());
            if (timeout <= 0) {
                throw new MotanFrameworkException("NettyClient init Error: timeout(" + timeout + ") <= 0 is forbid.",
                        MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
            }
            // 不去依赖于connectTimeout
            boolean result = channleFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
            boolean success = channleFuture.isSuccess();
            if (result && success) {
                channel = channleFuture.getChannel();
                if (channel.getLocalAddress() != null && channel.getLocalAddress() instanceof InetSocketAddress) {
                    localAddress = (InetSocketAddress) channel.getLocalAddress();
                }
                state = ChannelState.ALIVE;
                return true;
            }
            boolean connected = false;
            if(channleFuture.getChannel() != null){
                connected = channleFuture.getChannel().isConnected();
            }
            if (channleFuture.getCause() != null) {
                channleFuture.cancel();
                throw new MotanServiceException("NettyChannel failed to connect to server, url: "
                        + nettyClient.getUrl().getUri()+ ", result: " + result + ", success: " + success + ", connected: " + connected, channleFuture.getCause());
            else {
                channleFuture.cancel();
                throw new MotanServiceException("NettyChannel connect to server timeout url: "
                        + nettyClient.getUrl().getUri() + ", cost: " + (System.currentTimeMillis() - start) + ", result: " + result + ", success: " + success + ", connected: " + connected);
            }
        catch (MotanServiceException e) {
            throw e;
        catch (Exception e) {
            throw new MotanServiceException("NettyChannel failed to connect to server, url: "
                    + nettyClient.getUrl().getUri(), e);
        finally {
            if (!state.isAliveState()) {
                nettyClient.incrErrorCount();//增加错误次数
            }
        }
    }
}

本章知识点总结:

1.使用netty作为底层通讯框架;

2.每个refer对应一个nettyclient和一个nettychannel,nettychannel是由工厂类创建;

3.每个client在初始化时,最少创建2个长连接,由配置决定;

4.使用了GenericObjectPool来作为连接池;

5.当每个client的连续调用出错数达到阀值时,将自动设置此client为不可用;

6.心跳操作由客户端发起,只针对不可用状态的client。

motan源码分析六:客户端与服务器的通信层分析相关推荐

  1. 轻量级Rpc框架设计--motan源码解析六:client端服务发现

    一, Client端初始化工作 client端通过RefererConfigBean类实现InitializingBean接口的afterPropertiesSet方法, 进行下面三项检查配置工作: ...

  2. BT源代码学习心得(六):跟踪服务器(Tracker)的代码分析(初始化) -- 转贴自 wolfenstein (NeverSayNever)

    BT源代码学习心得(六):跟踪服务器(Tracker)的代码分析(初始化) author:wolfenstein Tracker在BT中是一个很重要的部分.这个名词我注意到以前的文章中都是直接引用,没 ...

  3. BT源代码学习心得(六):跟踪服务器(Tracker)的代码分析(初始化)

    BT源代码学习心得(六):跟踪服务器(Tracker)的代码分析(初始化) 发信人: wolfenstein (NeverSayNever), 个人文集 标  题: BT源代码学习心得(六):跟踪服务 ...

  4. 16w行的nginx源码,如何才能读懂呢?全面分析nginx的机制

    16w行的nginx源码,如何才能读懂呢?全面分析nginx的机制 1. nginx的轮子组成 2. nginx的epoll实现机制 3. nginx的内存机制 视频讲解如下,点击观看: 16w行的n ...

  5. 校园网跑腿小程序源码 服务端+客户端+小程序

    介绍: 校园网跑腿小程序源码 需要准备 1.小程序 2.服务器(推荐配置2h4g3m) 3.域名(需要备案) 搭建教程 使用服务器搭建宝塔 安装pm2管理器 新建项目上传服务器接口 修改/pub/co ...

  6. 【Android 性能优化】应用启动优化 ( 安卓应用启动分析 | Launcher 应用简介 | Launcher 应用源码简介 | Launcher 应用快捷方式图标点击方法分析 )

    文章目录 一. Launcher 应用简介 二. Launcher 应用源码简介 三. Launcher 图标点击方法分析 一. Launcher 应用简介 Launcher 应用 : Android ...

  7. SpringCloud组件 源码剖析:Eureka服务注册方式流程全面分析

    在SpringCloud组件:Eureka服务注册是采用主机名还是IP地址?文章中我们讲到了服务注册的几种注册方式,那么这几种注册方式的源码是怎么实现的呢?我们带着这一个疑问来阅读本章内容能够让你更深 ...

  8. Spring源码-AOP(六)-自动代理与DefaultAdvisorAutoProxyCreator

    2019独角兽企业重金招聘Python工程师标准>>> Spring AOP 源码解析系列,建议大家按顺序阅读,欢迎讨论 Spring源码-AOP(一)-代理模式 Spring源码- ...

  9. mybatis源码阅读(六) ---StatementHandler了解一下

    转载自  mybatis源码阅读(六) ---StatementHandler了解一下 StatementHandler类结构图与接口设计 BaseStatementHandler:一个抽象类,只是实 ...

最新文章

  1. java 媒体播放器_Java多媒体播放器(三)
  2. 混合云存储开启企业上云新路径--阿里云混合云备份容灾方案发布
  3. python常用命令汇总-Python爬虫框架Scrapy常用命令总结
  4. python自学路线-Python最佳学习路线
  5. 【Android 性能优化】布局渲染优化 ( GPU 过度绘制优化总结 | CPU 渲染过程 | Layout Inspector 工具 | View Tree 分析 | 布局组件层级分析 )
  6. 【Paper】2020_含时延约束的多智能体系统二分一致性
  7. 应该是最全的算法学习路线了吧法学习路线了吧
  8. []End of 2017OI
  9. dhcp 授权的原理
  10. ef 在此上下文中只支持基本类型或枚举类型_Java枚举不应该成为你成功路上得绊脚石,源码给你讲解清楚
  11. js中的forEach、for in 、for of之间的区别
  12. QT之Win10安装(五)
  13. 单词拼写检查之cutoff距离
  14. 工业革命的秋之涟漪(三):飞桨,划行在智能经济之海
  15. Python如何根据日期判断周几
  16. 通过jsp向mysql批量导入数据_通过JSP+JavaBean对mysql进行添加数据的操作
  17. java vm art 2.1.0_ART Runtime 创建(二)--启动参数
  18. 记录一下在上海考驾照经历
  19. EXCEL VBA基础:通过创建模块完成简单SUB过程
  20. 天翼物联获2022年移动物联网“先进企业”

热门文章

  1. 情人节福利:京东上线“白条”业务可先购物后付款,最高可预支1.5万
  2. (Java笔记)IO流的六类16种流方式
  3. testlink二次开发php,testlink根据需求定制
  4. get_template_part() 函数
  5. 黄式宪:《江山美人》又见“侠女”传奇
  6. 猎人营教你如何请健身私教
  7. 广西首位女云计算HCIE在南职院诞生
  8. nouveau禁用失败
  9. MySQL入门教程系列-1.1 数据库基础
  10. 企业工程项目管理系统源码+项目模块功能清单