sofa协议服务器,sofa-rpc 服务端源码流程走读
sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图
下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例
public static void main(String[] args) {
ApplicationConfig application = new ApplicationConfig().setAppName("test-server");
ServerConfig serverConfig = new ServerConfig()
.setPort(22000)
.setDaemon(false);
ProviderConfig providerConfig = new ProviderConfig()
.setInterfaceId(HelloService.class.getName())
.setApplication(application)
.setRef(new HelloServiceImpl())
.setServer(serverConfig)
.setRegister(false);
ProviderConfig providerConfig2 = new ProviderConfig()
.setInterfaceId(EchoService.class.getName())
.setApplication(application)
.setRef(new EchoServiceImpl())
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();
providerConfig2.export();
LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
}
可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。
public synchronized void export() { if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}
根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap
/**
* 发布一个服务
*
* @param providerConfig 服务发布者配置
* @param 接口类型
* @return 发布启动类 */
public static ProviderBootstrap from(ProviderConfig providerConfig) {
String bootstrap = providerConfig.getBootstrap(); if (StringUtils.isEmpty(bootstrap)) { // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
providerConfig.setBootstrap(bootstrap);
}
ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); return (ProviderBootstrap) providerBootstrap;
}
DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。
DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。
我们看下DefaultProviderBootstrap服务启动源码
@Override public void export() { if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override public void run() { try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD }
doExport();
}
});
thread.start();
} else {
doExport();
}
} private void doExport() { if (exported) { return;
} // 检查参数 checkParameters();
String appName = providerConfig.getAppName(); //key is the protocol of server,for concurrent safe
Map hasExportedInCurrent = new ConcurrentHashMap(); // 将处理器注册到server
List serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol; if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
} // 注意同一interface,同一uniqleId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
} int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true); int maxProxyCount = providerConfig.getRepeatedExportLimit(); if (maxProxyCount > 0) { if (c > maxProxyCount) {
decrementCounter(hasExportedInCurrent); // 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key + " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) { if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}
} try { // 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig); // 初始化注册中心
if (providerConfig.isRegister()) {
List registryConfigs = providerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry }
}
} // 将处理器注册到server
for (ServerConfig serverConfig : serverConfigs) { try { //构建Server
Server server = serverConfig.buildIfAbsent(); // 注册序列化接口 server.registerProcessor(providerConfig, providerProxyInvoker); if (serverConfig.isAutoStart()) { //启动服务 server.start();
}
} catch (SofaRpcRuntimeException e) { throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
+ serverConfig.getId(), e);
}
} // 注册到注册中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent); if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e;
} else { throw new SofaRpcRuntimeException("Build provider proxy error!", e);
}
} // 记录一些缓存数据
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。
/**
* 启动服务
*
* @return the server */
public synchronized Server buildIfAbsent() { if (server != null) { return server;
} // 提前检查协议+序列化方式 // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()), // SerializationType.valueOf(getSerialization()));
//在sever工厂中拿到sever实例
server = ServerFactory.getServer(this); return server;
}
/**
* 初始化Server实例
*
* @param serverConfig 服务端配置
* @return Server */
public synchronized static Server getServer(ServerConfig serverConfig) { try {
Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort())); if (server == null) { // 算下网卡和端口 resolveServerConfig(serverConfig);
ExtensionClass ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
.getExtensionClass(serverConfig.getProtocol()); if (ext == null) { throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(), "Unsupported protocol of server!");
}
server = ext.getExtInstance(); //服务初始化 server.init(serverConfig);
SERVER_MAP.put(serverConfig.getPort() + "", server);
} return server;
} catch (SofaRpcRuntimeException e) { throw e;
} catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e);
}
}
sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer
BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。
/**
* Bolt服务端 */
protected RemotingServer remotingServer;
@Override public void start() { if (started) { return;
} synchronized (this) { if (started) { return;
} // 生成阿里基于netty的bolt服务Server对象
remotingServer = initRemotingServer(); try { if (remotingServer.start(serverConfig.getBoundHost())) { if (LOGGER.isInfoEnabled()) {
LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
serverConfig.getPort());
}
} else { throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
}
started = true; if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}
} catch (SofaRpcRuntimeException e) { throw e;
} catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
}
}
}
AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的
/**
* 服务端通讯层 */
private ServerTransport serverTransport;
@Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; this.serverTransportConfig = convertConfig(serverConfig); // 启动线程池
this.bizThreadPool = initThreadPool(serverConfig); // 服务端处理器
this.serverHandler = new HttpServerHandler(); // set default transport config
this.serverTransportConfig.setContainer(container); this.serverTransportConfig.setServerHandler(serverHandler);
}
@Override public void start() { if (started) { return;
} synchronized (this) { if (started) { return;
} try { // 启动线程池
this.bizThreadPool = initThreadPool(serverConfig); this.serverHandler.setBizThreadPool(bizThreadPool); //实例化服务,具体代码见
serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
started = serverTransport.start(); if (started) { if (EventBus.isEnable(ServerStartedEvent.class)) {
EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
}
}
} catch (SofaRpcRuntimeException e) { throw e;
} catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
}
}
}
ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport
/**
* 构造函数
*
* @param transportConfig 服务端配置 */
protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) { super(transportConfig);
}
@Override public boolean start() { if (serverBootstrap != null) { return true;
} synchronized (this) { if (serverBootstrap != null) { return true;
} boolean flag = false;
SslContext sslCtx = SslContextBuilder.build(); // Configure the server.
EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig); //可以看到然是基于Netty
HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, bizGroup)
.channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
.option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
.option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
.option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
.childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
.childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
transportConfig.getBufferMin(), transportConfig.getBufferMax()))
.childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
httpServerHandler, transportConfig.getPayload())); // 绑定到全部网卡 或者 指定网卡
ChannelFuture future = serverBootstrap.bind( new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
@Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOGGER.isInfoEnabled()) {
LOGGER.info("HTTP/2 Server bind to {}:{} success!",
transportConfig.getHost(), transportConfig.getPort());
}
} else {
LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
transportConfig.getHost(), transportConfig.getPort());
stop();
}
}
}); try {
channelFuture.await(); if (channelFuture.isSuccess()) {
flag = Boolean.TRUE;
} else { throw new SofaRpcRuntimeException("Server start fail!", future.cause());
}
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
} return flag;
}
}
RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。
/**
* Rest服务端 */
protected SofaNettyJaxrsServer httpServer;
@Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig;
httpServer = buildServer();
}
SofaNettyJaxrsServer中服务启动的具体代码
@Override public void start() { // CHANGE: 增加线程名字
boolean daemon = serverConfig.isDaemon(); boolean isEpoll = serverConfig.isEpoll();
NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
: new NioEventLoopGroup(ioWorkerCount, ioFactory);
eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
: new NioEventLoopGroup(executorThreadCount, bizFactory); // Configure the server.
bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childHandler(createChannelInitializer())
.option(ChannelOption.SO_BACKLOG, backlog)
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive
for (Map.Entry entry : channelOptions.entrySet()) {
bootstrap.option(entry.getKey(), entry.getValue());
} for (Map.Entry entry : childChannelOptions.entrySet()) {
bootstrap.childOption(entry.getKey(), entry.getValue());
} final InetSocketAddress socketAddress; if (null == hostname || hostname.isEmpty()) {
socketAddress = new InetSocketAddress(port);
} else {
socketAddress = new InetSocketAddress(hostname, port);
}
bootstrap.bind(socketAddress).syncUninterruptibly();
}
OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。
sofa协议服务器,sofa-rpc 服务端源码流程走读相关推荐
- java sofa rpc_sofa-rpc服务端源码的详细分析(附流程图)
本篇文章给大家带来的内容是关于sofa-rpc服务端源码的详细分析(附流程图),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. sofa-rpc是阿里开源的一款高性能的rpc框架,这篇 ...
- 畅玩mt3单机游戏服务器维护,【梦幻西游】MT3仿端手工游戏服务端源码[教程+授权物品后台]...
[梦幻西游]MT3仿端手工游戏服务端源码[教程+授权物品后台] 架设教程 系统:CentOS 6.8 64位 1.关闭防火墙 chkconfig iptables off service iptab ...
- Netty源码阅读(2)之——服务端源码梗概
上文我们把客户端源码梗概大致了解了一下,这样再了解服务端源码就轻松一点,我们将从服务端和客户端的区别着手去解析. 目录 区别 ④ ③ ① ⑤ 区别 ④ 客户端:.option(ChannelOptio ...
- 五子棋服务端程序java_9网上五子棋对战(java)服务端源码
9网上五子棋对战(java)服务端源码 网上五子棋对战(java)服务端源码 /* 五子棋游戏是本人在学习java swing时写的一个程序,程序分两部分:服务器端和客户端.运行程序时先运行服务器端, ...
- java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署
java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运 ...
- java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试
java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计融呗智慧金融微资讯移动平台服务端源码+lw文档+mybatis+系统+my ...
- java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+mybatis+运行部署
java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计融呗智慧金融微资讯移动平台服务端源码+系统+数据库+lw文档+myb ...
- java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署
java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署 java计算机毕业设计教育辅导班信息网服务端源码+mysql数据库+系统+lw文档+部署 本源码技术栈: 项目 ...
- java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试
java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计社区养老综合服务平台服务端源码+lw文档+mybatis+系统+mysql数据库 ...
最新文章
- App开放接口api安全:Token签名sign的设计与实现
- 关于Integer.MAX_VALUE + 1 = Integer.MIN_VALUE 问题
- c语言中指,浅析C语言中指与数组.doc
- Visual Studio 2017迎来F# 4.1
- 前端学习(3011):vue+element今日头条管理--关于编辑器代码段
- JVM 调优 2:GC 如何判断对象是否为垃圾,三色标记算法应用原理及存在的问题?
- jquery读取json文件跨域_跨域方法的若干种方式
- python pandas 日期格式_pandas 快速处理 date_time 日期格式方法
- Java设计模式总汇二---MVC、中介者设计模式
- php7 php5 区别,php7与php5的区别有哪些?
- python plot 增加标记线_Python可视化| matplotlib04-掌握标记和线型的使用,一文,marker,linestyle...
- Html - Json转excel文件
- tomcat乱码问题解决
- pdf文件解密去水印加书签
- 钟平---逻辑英语的语法讲解
- 链表C语言和C++两种方式实现
- axure 折线图部件_在Axure中怎么做柱状图、折线图啊?
- ubuntu20.04 跳过grub
- BZOJ 2448: 挖油-区间DP+单调队列
- java计算机毕业设计-数字相册管理系统-源程序+mysql+系统+lw文档+远程调试