1. Client端主要用来采集各种消息,本文采用官方的例子

  Transaction t = Cat.newTransaction("your transaction type", "your transaction name");try {yourBusinessOperation();Cat.logEvent("your event type", "your event name", Event.SUCCESS, "keyValuePairs")t.setStatus(Transaction.SUCCESS);} catch (Exception e) {Cat.logError(e);//用log4j记录系统异常,以便在Logview中看到此信息t.setStatus(e);throw e; (CAT所有的API都可以单独使用,也可以组合使用,比如Transaction中嵌套Event或者Metric。)(注意如果这里希望异常继续向上抛,需要继续向上抛出,往往需要抛出异常,让上层应用知道。)(如果认为这个异常在这边可以被吃掉,则不需要在抛出异常。)} finally {t.complete();}

首先需要初始化容器,Cat采用的容器是PlexusContainer,加载配置文件/META-INF/plexus/plexus.xml,初始化完后加载对应的模块CatClientModule,加载META-INF/plexus/components-cat-client.xml对应的配置,各个类的属性通过@Inject标签注入

private static void checkAndInitialize() {if (!s_init) {synchronized (s_instance) {if (!s_init) {initialize(new File(getCatHome(), "client.xml"));log("WARN", "Cat is lazy initialized!");s_init = true;}}}}
// this should be called during application initialization timepublic static void initialize(File configFile) {PlexusContainer container = ContainerLoader.getDefaultContainer();initialize(container, configFile);}public static void initialize(PlexusContainer container, File configFile) {ModuleContext ctx = new DefaultModuleContext(container);Module module = ctx.lookup(Module.class, CatClientModule.ID);if (!module.isInitialized()) {ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);ctx.setAttribute("cat-client-config-file", configFile);initializer.execute(ctx, module);}}

初始化毫秒获取器MilliSecondTimer,设置线程监听器CatThreadListener,初始化DefaultMessageProducer,PlainTextMessageCodec编解码器,通过ID = "plain-text"定位接口实现类,初始化DefaultMessageStatistics数据统计类

protected void execute(final ModuleContext ctx) throws Exception {ctx.info("Current working directory is " + System.getProperty("user.dir"));// initialize milli-second resolution level timerMilliSecondTimer.initialize();// tracking thread start/stopThreads.addListener(new CatThreadListener(ctx));// warm up CatCat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());// bring up TransportManagerctx.lookup(TransportManager.class);ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);if (clientConfigManager.isCatEnabled()) {// start status update taskStatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);Threads.forGroup("cat").start(statusUpdateTask);LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);// Threads.forGroup("cat").start(mmapReaderTask);}}

2. 初始化DefaultClientConfigManager,从"/data/appdatas/cat/client.xml"中加载全局客户端配置文件,从"/META-INF/app.properties"加载工程名,进而组建本地配置文件,如果不存在的话就从"/META-INF/cat/client.xml"加载,最后合并两个文件,并且合并里面的相应属性Server,Domain,Property等。最后生成对应的ClientConfig。

3. 初始化DefaultMessageManager,保存配置的第一个domain和主机名和ip,初始化id生成器MessageIdFactory,创建一个保存当前消息id和时间的文件,用MappedByteBuffer进行一对一映射文件,

public void initialize() throws InitializationException {m_domain = m_configManager.getDomain();m_hostName = NetworkInterfaceManager.INSTANCE.getLocalHostName();if (m_domain.getIp() == null) {m_domain.setIp(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());}// initialize domain and IP addresstry {m_factory.initialize(m_domain.getId());} catch (IOException e) {throw new InitializationException("Error while initializing MessageIdFactory!", e);}}

4. 初始化DefaultTransportManager,获取对应的Server地址信息,该信息在数据tcp交互时需要

m_tcpSocketSender.setServerAddresses(addresses);m_tcpSocketSender.initialize();

5. 初始化TcpSocketSender,设置两个链式阻塞队列LinkedBlockingQueue,m_queue存储普通消息,m_atomicTrees存储原子消息,也就是type以"Cache."开头或者"SQL"类型的消息。

public void initialize() {int len = getQueueSize();m_queue = new DefaultMessageQueue(len);m_atomicTrees = new DefaultMessageQueue(len);m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);Threads.forGroup("cat").start(this);Threads.forGroup("cat").start(m_manager);Threads.forGroup("cat").start(new MergeAtomicTask());}

初始化tcp通讯管理类ChannelManager,设置netty客户端的相关信息,获取服务端ip端口的信息,当为空的时候才使用之前传过来的serverAddresses信息,

public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,ClientConfigManager configManager, MessageIdFactory idFactory) {m_logger = logger;m_queue = queue;m_configManager = configManager;m_idfactory = idFactory;EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);return t;}});Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class);bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {}});m_bootstrap = bootstrap;String serverConfig = loadServerConfig();if (StringUtils.isNotEmpty(serverConfig)) {List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);ChannelHolder holder = initChannel(configedAddresses, serverConfig);if (holder != null) {m_activeChannelHolder = holder;} else {m_activeChannelHolder = new ChannelHolder();m_activeChannelHolder.setServerAddresses(configedAddresses);}} else {ChannelHolder holder = initChannel(serverAddresses, null);if (holder != null) {m_activeChannelHolder = holder;} else {m_activeChannelHolder = new ChannelHolder();m_activeChannelHolder.setServerAddresses(serverAddresses);m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");}}}

初始化tcp通道信息,创建通道ChannelFuture,保存到ChannelHolder中,并且记录相应的服务列表下标主机名等信息。

private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {try {int len = addresses.size();for (int i = 0; i < len; i++) {InetSocketAddress address = addresses.get(i);String hostAddress = address.getAddress().getHostAddress();ChannelHolder holder = null;if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {holder = new ChannelHolder();holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);} else {ChannelFuture future = createChannel(address);if (future != null) {holder = new ChannelHolder();holder.setActiveFuture(future).setConnectChanged(true);}}if (holder != null) {holder.setActiveIndex(i).setIp(hostAddress);holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);m_logger.info("success when init CAT server, new active holder" + holder.toString());return holder;}}} catch (Exception e) {m_logger.error(e.getMessage(), e);}try {StringBuilder sb = new StringBuilder();for (InetSocketAddress address : addresses) {sb.append(address.toString()).append(";");}m_logger.info("Error when init CAT server " + sb.toString());} catch (Exception e) {// ignore}return null;}

6. 启动三个守护线程ChannelManager,每10s执行一次,在文件中保存消息ID索引信息,检查服务器列表信息是否有改变,检查当前的通道是否还在存活状态,按顺序链接服务器信息。比如本次链接的是第二个服务器,当这次执行的时候会看看第一个服务器是否能链接成功,是的话结束之前连接,保存新连接。

public void run() {while (m_active) {// make save message id index asycm_idfactory.saveMark();checkServerChanged();ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();doubleCheckActiveServer(activeFuture);reconnectDefaultServer(activeFuture, serverAddresses);try {Thread.sleep(10 * 1000L); // check every 10 seconds} catch (InterruptedException e) {// ignore}}}

MergeAtomicTask线程,5s执行一次。当m_atomicTrees队列中的消息最长的已经超过了30s或者长度已经超过200个,就可以把这些消息进行合并处理,保存成一条消息MessageTree,最后放进消息队列m_queue,回收已经合并过的消息id,当消息数量超过队列容量时,会记录相关的溢出数据,每1000次打印一次error日志

public void run() {while (true) {if (shouldMerge(m_atomicTrees)) {MessageTree tree = mergeTree(m_atomicTrees);boolean result = m_queue.offer(tree);if (!result) {logQueueFullInfo(tree);}} else {try {Thread.sleep(5);} catch (InterruptedException e) {break;}}}}

TcpSocketSender消息发送线程,检查通道是否正常可写,从消息队列m_queue取出消息,编码并且写入到ByteBuf,写入并且刷新通道,统计消息大小以及数量,当通道不可写时,查看消息队列中的消息生成时间,超过一个小时的清理掉,记录溢出数。

public void run() {m_active = true;try {while (m_active) {ChannelFuture channel = m_manager.channel();if (channel != null && checkWritable(channel)) {try {MessageTree tree = m_queue.poll();if (tree != null) {sendInternal(tree);tree.setMessage(null);}} catch (Throwable t) {m_logger.error("Error when sending message over TCP socket!", t);}} else {long current = System.currentTimeMillis();long oldTimestamp = current - HOUR;while (true) {try {MessageTree tree = m_queue.peek();if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {MessageTree discradTree = m_queue.poll();if (discradTree != null) {m_statistics.onOverflowed(discradTree);}} else {break;}} catch (Exception e) {m_logger.error(e.getMessage(), e);break;}}TimeUnit.MILLISECONDS.sleep(5);}}} catch (InterruptedException e) {// ignore itm_active = false;}}
private void sendInternal(MessageTree tree) {ChannelFuture future = m_manager.channel();ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10Km_codec.encode(tree, buf);int size = buf.readableBytes();Channel channel = future.channel();channel.writeAndFlush(buf);if (m_statistics != null) {m_statistics.onBytes(size);}}

7. 启动守护线程StatusUpdateTask,定时发送心跳消息和本机的线程内存JVM等信息。

public void run() {// try to wait cat client init successtry {Thread.sleep(10 * 1000);} catch (InterruptedException e) {return;}while (true) {Calendar cal = Calendar.getInstance();int second = cal.get(Calendar.SECOND);// try to avoid send heartbeat at 59-01 secondif (second < 2 || second > 58) {try {Thread.sleep(1000);} catch (InterruptedException e) {// ignore it}} else {break;}}try {buildClasspath();} catch (Exception e) {e.printStackTrace();}MessageProducer cat = Cat.getProducer();Transaction reboot = cat.newTransaction("System", "Reboot");reboot.setStatus(Message.SUCCESS);cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null);reboot.complete();while (m_active) {long start = MilliSecondTimer.currentTimeMillis();if (m_manager.isCatEnabled()) {Transaction t = cat.newTransaction("System", "Status");Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);StatusInfo status = new StatusInfo();t.addData("dumpLocked", m_manager.isDumpLocked());try {StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));buildExtensionData(status);h.addData(status.toString());h.setStatus(Message.SUCCESS);} catch (Throwable e) {h.setStatus(e);cat.logError(e);} finally {h.complete();}t.setStatus(Message.SUCCESS);t.complete();}long elapsed = MilliSecondTimer.currentTimeMillis() - start;if (elapsed < m_interval) {try {Thread.sleep(m_interval - elapsed);} catch (InterruptedException e) {break;}}}}

CAT的Client端初始化相关推荐

  1. 轻量级Rpc框架设计--motan源码解析六:client端服务发现

    一, Client端初始化工作 client端通过RefererConfigBean类实现InitializingBean接口的afterPropertiesSet方法, 进行下面三项检查配置工作: ...

  2. codeblock socket 编译错误_从Linux源码看Socket(TCP)Client端的Connect

    从Linux源码看Socket(TCP)Client端的Connect 前言 笔者一直觉得如果能知道从应用到框架再到操作系统的每一处代码,是一件Exciting的事情. 今天笔者就来从Linux源码的 ...

  3. oracle 与 client端执行结果不一致_不同模式下Spark应用的执行过程

    根据应用执行的3个阶段,不同执行模式下各个阶段的执行逻辑不相同,本文分析不同模式下的执行逻辑. Yarn-Client模式的执行流程 Yarn的组成 Yarn是hadoop自带的资源管理框架,它的设计 ...

  4. elasticsearch源码分析之search模块(client端)

    elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...

  5. Client端异步Callback的应用与介绍

    转载:http://blog.csdn.net/goalbell/archive/2007/09/16/1787213.aspx 1.Client端异步Callback的介绍:它是通过前端Client ...

  6. mbedTLS(PolarSSL)简单思路和函数笔记(Client端)

    转自: OpenSSL一直以来各种被诟病,具体挑了哪些刺,本文就不深究.作为OpenSSL有很多替代,我了解到的有cyaSSL(WolfSSL)和PolorSSL.其中PolarSSL已经被ARM收购 ...

  7. T100 WebService与Client端开发

    T100 WebService Server端开发: 开发流程:1.服务注册:2.服务程序签出:3.服务程序撰写:4.服务程序上传 1.azzi700注册服务规格编号,然后签出就可以写程序了: 2.宣 ...

  8. 64位系统上使用*** Client端

    最近在工作中遇到一个问题,由于想用大于4G的内存,便安装了64位的操作系统,但无论是Windows 2003或是Windows 7等,凡是64位的便不能安装Cisco *** CLient软件. 公司 ...

  9. Oracle监听器Server端与Client端配置实例

    Listener.ora.tnsnames.ora这两个文件常常因为格式问题而不好用,我平时都是配置好了留个备份,以后都是拷贝过去改改就好了!嘿嘿~~~ 因为平时使用linux的时候较多,所以有时还会 ...

  10. 【Apache Mina2.0开发之二】自定义实现Server/Client端的编解码工厂(自定义编码与解码器)!...

    本站文章均为 李华明Himi 原创,转载务必在明显处注明: 转载自[黑米GameDev街区] 原文链接: http://www.himigame.com/apache-mina/831.html ☞ ...

最新文章

  1. js作用域链以及全局变量和局部变量
  2. 解题报告——习题2-5 分数化小数(decimal) 输入正整数a,b,c,输出a/b的小数形式,精确到小数点后c位。
  3. gridview添加header
  4. 深入Java类型信息:RTTI和反射
  5. Android平台和java平台 DES加密解密互通程序及其不能互通的原因
  6. 100万并发连接服务器笔记之准备篇
  7. 无人机-2多翼无人机的结构与硬件
  8. 课程《设计模式之美》笔记之关于面向对象与面向过程
  9. php 自定义字段erp,在SuiteCRM中创建自定义字段类型
  10. python excel 空值_Python/Excel/SPSS/SQL数据处理方法比较之4 - 空值处理
  11. LeetCode-Hot100-两数相加
  12. 基于JSP的运动会综合管理系统
  13. 【学术期刊】2023CCF推荐的A,B,C类英文科技期刊目录最新发布
  14. [推荐] Chrome谷歌浏览器实时英文字幕插件
  15. 索骥馆-网络创业之《网上赚钱从入门到精通》扫描版[PDF]
  16. Geodetic集合 c++
  17. imshow与显示图像时的全白问题
  18. matlab变步长龙格库塔法,matlab 龙格库塔法 变步长龙格库塔法
  19. 秒杀系统防止超卖解决方案
  20. 垃圾回收机制?垃圾回收的流程?

热门文章

  1. Vue中Class和Style几种v-bind绑定的用法-详解案例
  2. 什么是 MAC 地址?
  3. 《Detecting Adversarial Examples through Image Transformation》和CW attack的阅读笔记
  4. FindBugs 汇总(持续修改)
  5. 暴走英雄坛服务器维修,暴走英雄坛采集位置及注意事项一览
  6. 木兰当事人回应!承认部分基于 Python 二次开发
  7. 【processing】追
  8. python tokenize()_Python tokenize-rt包_程序模块 - PyPI - Python中文网
  9. pandas 行列转换
  10. 分析网易云用户运营的指标监控和召回机制