目录

业务场景:

过程1(单flume):

过程2(多个flume并行处理):

过程3(多路复用(路由)模式):

下面一个flume的配置,和selector.header的java代码介绍


业务场景:

每五分钟会新生成一个2.4G左右的gz压缩文件,大概1680万条数据, 现在需要通过flume做数据的清洗,处理,然后写入kafka。

服务器环境: 1台  32核 125G内存的服务器。

考虑方案(费脑,疯狂测试)

代码思路:

自定义的source读取目录文件,消费数据解析,转换格式,写入channel,自定义source另外有一篇博客,这里暂不介绍。

自定义source有两种方式: PollableSource和EventDrivenSource。PollableSource自动轮询调用 process() 方法,从中取数据,EventDrivenSource是自己定义回调机制来捕获新数据并将其存储到 Channel 中

开始flume节点的思路:

过程1(单flume):

flume使用1个agent,1个source,1个channel,1个sink,消费数据

开始测试:

最开始channel使用的是file, 测试结果大概1秒几千, channel使用memory,测试性能大概是1秒2-3万的样子

太慢了,不行,需要优化-----------------------------------------------------------------

过程2(多个flume并行处理):

考虑开启多个flume来监控多个目录。这样就好并行处理,来来来开始上手

做法: 将压缩包解压,然后使用linux的split命令分隔成多个文件,然后放到多个目录中,这样就并行处理了,

缺点:但是解压时间长,解压之后2.4G的gz文件会变成10多G,会很占服务器的磁盘。

没磁盘呀,人也太懒,不想这么麻烦--------------------------------------------------

过程3(多路复用(路由)模式):

测试性能的时候,没有sink,只有source写入channel大概能用1秒5万,如果有sink写入kafka,这样1秒只有2万左右,所以并行量的瓶颈是sink写入kafka,

那么我考虑用多个sink消费一个source,这样就可以提升并发量。

开始疯狂百度,查看flume官网: http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

找到了flume有一种机制, 多路复用(路由)模式,如下:

按照自己定义的方式可以将    一个source数据写入到不同的channel

最后长期测试大概1秒5-6万的数据量。

服务器的长期测试期间cpu和内存使用情况如下:

使用multiplexing方式就是,通过selector定义header, 比如数据的header中有一个State, 我将其值定义为AZ,则这个数据就会进入wangwu1Channel这个channel中;值为BZ,则这个数据就会进入wangwu3Channel这个channel中,如果不是AZ,BZ,CZ,最后会进入wangwu1Channel这个channel。

下面是一个flume的配置,和定义selector.header的java代码介绍

flume-conf.properties配置:

agent.sources = wangsu1Source
agent.sinks = wangsu1Sink wangsu2Sink wangsu3Sink
agent.channels = wangsu1Channel wangsu2Channel wangsu3Channelagent.sources.wangsu1Source.type = com.mmtrix.source.MySource
agent.sources.wangsu1Source.channels = wangsu1Channel wangsu2Channel wangsu3Channel
agent.sources.wangsu1Source.check = /tigard/collector/apache-flume-1.6.0-bin/mvod/wangsu1log/checkpoint.conf
agent.sources.wangsu1Source.batchSize = 1048576
agent.sources.wangsu1Source.startTime = 2018041400
agent.sources.wangsu1Source.backupDir = /data1/wangnei
agent.sources.wangsu1Source.errorLog = /usr/local/errorLog.log
agent.sources.wangsu1Source.regex = (^MG_CDN_3004_)[0-9]{12}_(.+)(.log.gz$)
agent.sources.wangsu1Source.userId = 3004
agent.sources.wangsu1Source.logType = migu
agent.sources.wangsu1Source.resourceIds = 65
agent.sources.wangsu1Source.recordFileName = /tigard/collector/apache-flume-1.6.0-bin/mvod/wangsu1log/recordFileName.confagent.sources.wangsu1Source.selector.type = multiplexing
agent.sources.wangsu1Source.selector.header = State
agent.sources.wangsu1Source.selector.mapping.AZ = wangsu1Channel
agent.sources.wangsu1Source.selector.mapping.BZ = wangsu3Channel
agent.sources.wangsu1Source.selector.mapping.CZ = wangsu2Channel
agent.sources.wangsu1Source.selector.default = wangsu1Channelagent.sinks.wangsu1Sink.type = com.mmtrix.sink.test2
agent.sinks.wangsu1Sink.channel = wangsu1Channel
agent.sinks.wangsu1Sink.topic = migulog_2_replica
agent.sinks.wangsu1Sink.requiredAcks = -1
agent.sinks.wangsu1Sink.batchSize = 10000
agent.sinks.wangsu1Sink.brokerList = mg001.mq.tigard.com:19092,mg002.mq.tigard.com:19092,mg003.mq.tigard.com:19092,mg004.mq.tigard.com:19092,mg005.mq.tigard.com:19092,mg006.mq.tigard.com:19092
agent.sinks.wangsu1Sink.kafka.compression.codec = snappy
agent.sinks.wangsu1Sink.kafka.producer.type = sync
agent.sinks.wangsu1Sink.serializer.class=kafka.serializer.StringEncoderagent.channels.wangsu1Channel.type = memory
agent.channels.wangsu1Channel.capacity = 5000000
agent.channels.wangsu1Channel.transactionCapacity = 50000agent.sinks.wangsu2Sink.type = com.mmtrix.sink.test2
agent.sinks.wangsu2Sink.channel = wangsu2Channel
agent.sinks.wangsu2Sink.topic = migulog_2_replica
agent.sinks.wangsu2Sink.requiredAcks = -1
agent.sinks.wangsu2Sink.batchSize = 10000
agent.sinks.wangsu2Sink.brokerList = mg001.mq.tigard.com:19092,mg002.mq.tigard.com:19092,mg003.mq.tigard.com:19092,mg004.mq.tigard.com:19092,mg005.mq.tigard.com:19092,mg006.mq.tigard.com:19092
agent.sinks.wangsu2Sink.kafka.compression.codec = snappy
agent.sinks.wangsu2Sink.kafka.producer.type = sync
agent.sinks.wangsu2Sink.serializer.class=kafka.serializer.StringEncoderagent.channels.wangsu2Channel.type = memory
agent.channels.wangsu2Channel.capacity = 5000000
agent.channels.wangsu2Channel.transactionCapacity = 50000agent.sinks.wangsu3Sink.type = com.mmtrix.sink.test2
agent.sinks.wangsu3Sink.channel = wangsu3Channel
agent.sinks.wangsu3Sink.topic = migulog_2_replica
agent.sinks.wangsu3Sink.requiredAcks = -1
agent.sinks.wangsu3Sink.batchSize = 10000
agent.sinks.wangsu3Sink.brokerList = mg001.mq.tigard.com:19092,mg002.mq.tigard.com:19092,mg003.mq.tigard.com:19092,mg004.mq.tigard.com:19092,mg005.mq.tigard.com:19092,mg006.mq.tigard.com:19092
agent.sinks.wangsu3Sink.kafka.compression.codec = snappy
agent.sinks.wangsu3Sink.kafka.producer.type = sync
agent.sinks.wangsu3Sink.serializer.class=kafka.serializer.StringEncoderagent.channels.wangsu3Channel.type = memory
agent.channels.wangsu3Channel.capacity = 5000000
agent.channels.wangsu3Channel.transactionCapacity = 50000

按照自己定义的方式将一个source数据写入多个channel,我是使用随机的方式写入三个channel。

private List<String> ChannelSelector = Arrays.asList("AZ", "BZ", "CZ");// 随机选择
String selector = this.ChannelSelector.get((int) (Math.random() * this.ChannelSelector.size()));
logger.info("selector: " + selector);
for (byte[] value : mess) {if (value.length != 0) {logger.debug(value);Event event = EventBuilder.withBody(value);Map<String, String> map = new HashMap<String, String>();map.put("State", selector);event.setHeaders(map);events.add(event);}
}
System.out.println("==============" + String.valueOf(System.currentTimeMillis()/1000) + "===========read: " + events.size());
// 循环处理失败的批次,可能失败的原因是channel满了,写不进去
boolean loop = true;
while(loop) {try {getChannelProcessor().processEventBatch(events);} catch(Exception e) {logger.error("processEventBatch err:" + e.getMessage() + " sleeping 10s...");try {Thread.sleep(10000L);} catch(Exception e1) {logger.error(e1);} finally {continue;}}loop = false;
}
mess.clear();
mess = null;
events.clear();

flume设计性能优化相关推荐

  1. 《响应式Web设计性能优化》一2.1 性能度量基础

    本节书摘来异步社区<响应式Web设计性能优化>一书中的第2章,第2.1节,作者: [美]Tom Barker 译者: 余绍亮 , 丁一 , 叶磊 责编: 赵轩,更多章节内容可以访问云栖社区 ...

  2. 《响应式Web设计性能优化》一2.3 Web运行时性能

    本节书摘来异步社区<响应式Web设计性能优化>一书中的第2章,第2.3节,作者: [美]Tom Barker 译者: 余绍亮 , 丁一 , 叶磊 责编: 赵轩,更多章节内容可以访问云栖社区 ...

  3. 《响应式Web设计性能优化》一1.1 响应式设计存在的问题

    本节书摘来异步社区<响应式Web设计性能优化>一书中的第2章,第2.1节,作者: [美]Tom Barker 译者: 余绍亮 , 丁一 , 叶磊 责编: 赵轩,更多章节内容可以访问云栖社区 ...

  4. 短链系统设计性能优化-缓存提速及CDN

    4 Scale 如何提高响应速度,和直接打开原链接一样的效率. 明确,这是个读多写少业务. 4.1 缓存提速(Cache Aside) 缓存需存储两类数据: long2short(生成新 short ...

  5. ArcGIS地图设计性能优化

    制作一幅较高性能的地图检查如下方面是否注意到了 ? 1 内容组织: · 空间参考/Spatial Reference 在ArcMap中配置地图文档,要尽可能将所有数据与数据框使用相同的空间参考,特别是 ...

  6. 计算机网络课程设计性能优化方案,计算机网络课程设计方案(华北电力大学科技学院).doc...

    文档介绍: 膈莇螇蒂芈科技学院螈芄课程设计报告膀(2011--2012年度第2学期)莈膈羆芃名称:计算机网络课程设计莈题目:以太网数据帧分析及网莅院系:信息工程系蒄班级:软件09K2羂学号:09190 ...

  7. Flume 实战开发指南

    Flume 文章目录 Flume Flume介绍 Flume核心概念 Flume NG的体系结构 Source Channel Sink Flume的部署类型 单一流程 多代理流程(多个agent顺序 ...

  8. Flume采集日志数据

    一.为什么选用Flume? Flume vs Logstash vs Filebeat 当时选择数据采集工具时,我们主要参考了市面上热度比较高的Flume和Logstash还有Filebeat,据目前 ...

  9. 字节跳动技术团队提出「AI渲染」方案,手机端也能实现影视级渲染效果

    随着3D技术的高速发展,影视渲染效果的复杂度.精细度都在逐步提升,但高质量的渲染效果和时间成本往往难以兼顾.针对这一行业痛点,字节跳动智能创作团队基于AI技术的优势提出了「AI渲染」方案.这一方案现已 ...

最新文章

  1. Servlet 是线程安全的吗?
  2. Node中的fs模块
  3. 数据结构相关代码-简介
  4. ubuntu apt-get 时遇到waiting for headers的破解办法
  5. 杰·亚伯拉罕的产品营销35种策略完整版
  6. RH850/F1x的PWM-Diag功能分析
  7. 三维场景 WGS84 和街景(百度街景,腾讯街景,google街景,orbitgt街景)联动
  8. 内存、磁盘硬盘、软盘、光盘、磁盘驱动器的介绍
  9. PNG转ICO在线转换
  10. 人人网登陆(selenium、PtantomJS结合)
  11. 使用关键词快速搜索商品代码
  12. 纳德拉时代下的微软开源之路
  13. 基于Idea的Spark大数据分析--scala
  14. vim之快速查找功能
  15. 最近研究的windows API
  16. outputdebugstring函数的封装
  17. 英语单词词性顺口溜_英语词性修饰关系顺口溜
  18. new Date将字符串转化成日期格式 兼容IE,ie8如何通过new Date将字符串转化成日期格式,js中如何进行字符串替换, replace() 方法详解
  19. 我用Python爬取了妹子网200G的套图
  20. Kaldi 和 TIMIT 入门

热门文章

  1. 设计模式:装饰器模式
  2. 文件的名字带有特殊符号下载报错问题
  3. mysql 主键自增的范围_MySQL自增主键知识点总结
  4. 3dMax 车削(坛子)
  5. 【诡秘之主】超凡生物篇
  6. Java 实现系统权限控制思路
  7. Openshift Origin开发日记 1 - 10
  8. 天下3新手攻略——技能解读
  9. FX3U PLC V10.0源代码及设计图。
  10. 飞机机翼升力原因-流体力学