1 采集规划


说明:

D1 日志所在服务器1 —bigdata02.com

D2 日志所在服务器2 —bigdata03.com

A flume2 — bigdata02.com

日志收集

C flume3 — bigdata03.com

日志收集

B flume1 — bigdata01.com

日志整合

E kafka —bigdata01.com,bigdata02.com,bigdata03.com

存储到kafka

F HBase —bigdata01.com,bigdata02.com,bigdata02.com

存储到HBase

2版本

  • kafka kafka_2.11-0.10.0.0
  • flume flume-1.7.0-bin
  • hbase hbase-0.98.6-cdh5.3.0

3 安装

3.1 kafka安装

vi config/server.properties

broker.id=1 ##其他机器修改listeners=PLAINTEXT://bigdata01.com:9092 ##其他机器修改port=9092host.name=bigdata01.com ##其他机器修改num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/opt/modules/kafka_2.11-0.10.0.0/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1log.retention.hours=168log.segment.bytes=1073741824message.max.byte=5242880default.replication.factor=2replica.fetch.max.bytes=5242880log.retention.check.interval.ms=300000log.cleaner.enable=falsezookeeper.connect=bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181zookeeper.connection.timeout.ms=60000

发送到其他机器并修改server.properties

3.2 flume1安装

1 vi conf/flume-env.sh

 export JAVA_HOME=/opt/modules/jdk1.7.0_67export HADOOP_HOME=/opt/modules/hadoop-2.5.0export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

2 vi conf/flume-conf.properties

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks = kafkaSink hbaseSink#********************flume + hbase**************************agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC kafkaC
agent1.sources.r1.bind = bigdata01.com
agent1.sources.r1.port = 55555
agent1.sources.r1.threads = 5agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive = 20agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table = weblogs
agent1.sinks.hbaseSink.columnFamily = info
agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl#********************flume + kafka*****************************
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive = 10agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.brokerList = bigdata01.com:9092,bigdata02.com:9092,bigdata03.com:9092
agent1.sinks.kafkaSink.topic = weblogs
agent1.sinks.kafkaSink.zookeeperConnect= bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181
agent1.sinks.kafkaSink.requiredAcks = 1
agent1.sinks.kafkaSink.batchSize = 1
agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

3.3 flume2 安装

vi conf/flume-conf.properties

agent2.sources = s1
agent2.channels = c1
agent2.sinks = k1agent2.sources.s1.inputCharset = GBK
agent2.sources.s1.type = exec
agent2.sources.s1.command = tail -F /opt/datas/flume.log
agent2.sources.s1.channels=c1#channels configuration
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 10000
agent2.channels.c1.keep-alive = 3#sinks configuration
agent2.sinks.k1.type = avro
agent2.sinks.k1.hostname = bigdata01.com
agent2.sinks.k1.port = 55555
agent2.sinks.k1.channel = c1

3.3 flume3安装

vi conf/flume-conf.properties

agent3.sources = s1
agent3.channels = c1
agent3.sinks = k1agent3.sources.s1.inputCharset = GBK
agent3.sources.s1.type = exec
agent3.sources.s1.command = tail -F /opt/datas/flume.log
agent3.sources.s1.channels=c1#channels configuration
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000
agent3.channels.c1.transactionCapacity = 10000
agent3.channels.c1.keep-alive = 3#sinks configuration
agent3.sinks.k1.type = avro
agent3.sinks.k1.hostname = bigdata01.com
agent3.sinks.k1.port = 55555
agent3.sinks.k1.channel = c1

3.3 数据下载和预处理

下载地址

数据预处理 文本中有’\t’和" " 两种分割符 预处理的时候将两种分隔符统一用","分割

cat weblog.log |tr "\t" "," >weblog2.log
cat weblog2.log |tr " " "," >weblog3.log

3.4 flume ->hbase 源码修改

源码修改原因: 在初始源码中是一条数据Event只带一个列簇信息,而在这里一个event带了6个列簇信息 所以需要修改源码

  1. 下载源码
  2. 导入idea flume-ng-hbase-sink项目
  3. 新建org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer.java

package org.apache.flume.sink.hbase;import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;private byte[] cf;private byte[] payload;private byte[] payloadColumn;private byte[] incrementColumn;private String rowPrefix;private byte[] incrementRow;private KeyType keyType;@Overridepublic void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf;}//修改在这里  @Overridepublic List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {String [] colums = new String(this.payloadColumn).split(",");String [] columValue = new String(this.payload).split(",");for(int i =0;i<6;i++) {if(colums.length != columValue.length) {break;}String datetime = columValue[0].toString();String userid   = columValue[1].toString();byte [] hColColum = colums[i].getBytes();byte [] values = columValue[i].getBytes(Charsets.UTF_8);rowKey = SimpleRowKeyGenerator.getKfkTimestampKey(datetime,userid);PutRequest putRequest = new PutRequest(table, rowKey, cf,hColColum, values);actions.add(putRequest);}} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}public List<AtomicIncrementRequest> getIncrements() {List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub}@Overridepublic void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowPrefix = context.getString("rowPrefix", "default");String suffix = context.getString("suffix", "uuid");if (pCol != null && !pCol.isEmpty()) {if (suffix.equals("timestamp")) {keyType = KeyType.TS;} else if (suffix.equals("random")) {keyType = KeyType.RANDOM;} else if (suffix.equals("nano")) {keyType = KeyType.TSNANO;} else {keyType = KeyType.UUID;}payloadColumn = pCol.getBytes(Charsets.UTF_8);}if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void setEvent(Event event) {this.payload = event.getBody();}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub}}
  1. 重新导出jar包 并修改名字 flume-ng-hbase-sink-1.7.0.jar
  2. 上传到bigdata01.com flume的lib目录中替换原有的

3.5 模拟用户日志生成代码

package main.java;import java.io.*;
public class ReadWrite {static String readFileName;static String writeFileName;public static void main(String args[]){readFileName = args[0];writeFileName = args[1];try {// readInput();readFileByLines(readFileName);}catch(Exception e){}}public static void readFileByLines(String fileName) {FileInputStream fis = null;InputStreamReader isr = null;BufferedReader br = null;String tempString = null;try {System.out.println("以行为单位读取文件内容,一次读一整行:");fis = new FileInputStream(fileName);// FileInputStream// 从文件系统中的某个文件中获取字节isr = new InputStreamReader(fis,"GBK");br = new BufferedReader(isr);int count=0;while ((tempString = br.readLine()) != null) {count++;// 显示行号Thread.sleep(300);String str = new String(tempString.getBytes("UTF8"),"GBK");// System.out.println("row:"+count+">>>>>>>>"+tempString);method1(writeFileName,tempString);//appendMethodA(writeFileName,tempString);}isr.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} finally {if (isr != null) {try {isr.close();} catch (IOException e1) {}}}}public static void method1(String file, String conent) {BufferedWriter out = null;try {out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true)));out.write("\n");out.write(conent);} catch (Exception e) {e.printStackTrace();} finally {try {out.close();} catch (IOException e) {e.printStackTrace();}}}}

打包生成jar 然后上传到bigdata02.com:/opt/datas/,bigdata03.com:/opt/datas/

创建启动脚本

#bin/bash
echo "start ..."java -jar /opt/jar/weblogs.jar /opt/datas/weblog3.log /opt/datas/flume.log

3.6 启动各个组件

  1. 启动kafka

启动kafka

 bin/kafka-server-start.sh config/server.properties

创建topic 命令

bin/kafka-topics.sh --create --zookeeper bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181 --replication-factor 3 --partitions 1 --topic weblogs

创建消费脚本

#bin/bash
echo “kfk-kafka-comsumer.sh start ...”
bin/kafka-console-consumer.sh -zookeeper bigdata01.com:2181,bigdata02.com:2181,bigdata03.com:2181 --from-beginning --topic weblogs
  1. 启动hbase
  2. 启动flume(创建3个启动脚本 先启动flume2,flume3 再启动flume1)
#/bin/bashecho "flume-1 start......"bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

4 启动weblogs.jar

4 运行

1 查看hbase 中的数据


hbase(main):001:0> count 'weblogs'
Current count: 1000, row: 6062969462004942-00:00:07-1525921408064
1559 row(s) in 1.2250 seconds=> 1559

2 查看kafka中的数据

00:00:32,269736677015411,[itfm],2,11,chanye.finance.sina.com.cn/fz/2007-09-10/334786.shtml
00:00:32,393693921083884,[奥运圣火河南路线],1,3,yanziha.pengpeng.com/bbs/thread/1148790.html
00:00:31,14386146687311085,[PSP游戏《怪物猎人2G》中文版下载],3,2,bbs.2u.com.cn/archiver/tid-93698.html
00:00:31,6747965581699283,[韩国首都为什么改名],1,1,ks.cn.yahoo.com/question/1406120803100.html
00:00:31,5540122643843461,[感恩的心+试听],4,1,www.yymp3.com/Play/7326/92974.htm
00:00:31,9874717412370105,[小马过河的博客],5,5,gaoshanliuyun200.blog.163.com/blog/static/2448501200692303238515/
00:00:31,3978551963724469,[3.44x33.com/],1,1,3.44x33.com/
00:00:31,6345435406335671,[李成儒+离婚],1,1,ent.sina.com.cn/2004-12-28/0646612073.html
00:00:31,5275533831056154,[华国峰同志逝世],6,1,www.meizu.com/bbs/showerr.asp?BoardID=10&ErrCodes=29&action=%BB%AA%B9%FA%B7%E5%CD%AC%D6%BE%CA%C5%CA%C0
00:00:31,3949828035015059,[old+woman],3,21,www.xxxmovieforum.com/
00:00:31,19186326774082868,[张雅],5,5,tv.mofile.com/tags/???\xa8\xa6??:0,1,20,1,0,0,audittime,0,
00:00:31,6009454949181303,[缅甸第三特区],9,13,www.xzqh.org/bbs/read.php?tid=31074
00:00:31,9472812716405814,[软件],6,12,www.onlinedown.net/
00:00:32,9311412621037496,[哄抢救灾物资],2,1,pic.news.mop.com/gs/2008/0528/12985.shtml
00:00:32,3691729199504175,[哭泣的星空+MP3],2,2,yr0201.blog.sohu.com/22352924.html
00:00:32,40320548674212914,[杨丞琳辱华事件],1,1,you.video.sina.com.cn/b/1084004-1261359184.html
00:00:32,8561366108033201,[哄抢救灾物资],1,3,news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:32,141278734311103,[网站建设],1,1,www.qicaispace.com/
00:00:32,056513944508728375,[黎姿],2,1,news.baidu.com/f/17/lizi.html
00:00:32,269736677015411,[itfm],2,11,chanye.finance.sina.com.cn/fz/2007-09-10/334786.shtml
00:00:32,393693921083884,[奥运圣火河南路线],1,3,yanziha.pengpeng.com/bbs/thread/1148790.html
00:00:32,9994672352241558,[高级妓女],6,216,lady.anhuinews.com/system/2003/01/07/000213154.shtml
00:00:32,9994672352241558,[高级妓女],6,216,lady.anhuinews.com/system/2003/01/07/000213154.shtml
00:00:32,7954374672498488,[台湾空军叛逃大陆],6,4,www.hanhuncn.com/Html/Twsj/20060921074835205_2.html
00:00:32,2896977267956338,[荔枝核深加工],4,4,www.ilib.cn/A-spkj200603040.html
00:00:33,41800714861954374,[月见草油],7,13,www.hisuppliers.com/remen/list/yuejiancaoyou/yuejiancaoyou.html
00:00:33,2699849326058153,[见过这样的另类吗],9,1,bbs.vogue.com.cn/archiver/?tid-92752.html
00:00:33,12931893747701723,[美军审讯越南女战俘],15,59,xzd.2000y.net/mb/1/ReadNews.asp?NewsID=547170
00:00:33,4554795388682654,[宁王府],1,4,www.cdol.net/html/29/109929-15687.html
00:00:33,9921372392180088,[尺寸链],12,55,www.ilib.cn/A-kjqbkfyjj200307090.html
00:00:33,14386146687311085,[PSP游戏《怪物猎人2G》中文版下载],1,3,games.qq.com/a/20080401/000413.htm
00:00:33,9700485503618976,[如何让头发快速长长],2,12,zhidao.baidu.com/question/24246694.html
00:00:33,6242029922450475,[扫地车报价],19,40,liqixianjin.b2b.hc360.com/supply/27323118.html
00:00:33,8480586467887667,[科比81分视频],1,1,www.tudou.com/programs/view/cZMRnhWcGtw/
00:00:33,9378259159932798,[隆武帝],120,46,www.openow.net/details/e2007.html
00:00:33,8933412496786006,[沈国放间谍事件],1,9,news.qq.com/a/20060425/
00:00:33,48530765688455246,[胡其美],27,4,bbs1.hxsd.com.cn/user/info?uid=182634
00:00:33,28250791446280643,[命名],10,7,www.namers.cn/
00:00:33,21071231987753036,[莎朗斯通],3,6,ent.qq.com/a/20060214/000136.htm
00:00:33,9586356230570776,[学有所教、劳有所得、老有所养、病有所医、住有所居],1,5,cpc.people.com.cn/GB/67481/94156/105719/105723/106451/6738281.html
00:00:34,2199783436347869,[如何下载56视频],1,1,wenwen.soso.com/z/q8527818.htm

[大数据] 搜索日志数据采集系统 flume+hbase+kafka架构 (数据搜狗实验室)相关推荐

  1. Flume+HBase+Kafka集成与开发

    先把flume1.7的源码包下载 http://archive.apache.org/dist/flume/1.7.0/ 下载解压后 我们通过IDEA这个软件来打开这个工程 点击ok后我们选择打开一个 ...

  2. Flume+Kafka双剑合璧玩转大数据平台日志采集

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...

  3. 离线分析:Flume+Kafka+HBase+Hadoop通话数据统计

    文章目录 项目背景 项目架构 系统环境 系统配置 框架安装 JDK Hadoop Zookeeper Kafka Flume HBase 项目实现 项目结构 表设计 HBase Mysql 功能编写 ...

  4. 大数据时代的结构化存储-HBase在阿里的应用实践

    前言 时间回到2011年,Hadoop作为新生事物,在阿里巴巴已经玩得风生水起,上千台规模的"云梯"是当时国内名声显赫的计算平台. 这一年,Hadoop的好兄弟HBase由毕玄大师 ...

  5. 利用Flume将MySQL表数据准实时抽取到HDFS

    转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取 ...

  6. flume mysql hdfs_利用Flume将MySQL表数据准实时抽取到HDFS

    一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽 ...

  7. HBase 高性能获取数据(多线程批量式解决办法) + MySQL和HBase性能测试比较

    转载于:http://www.cnblogs.com/wgp13x/p/4245182.html 摘要:   在前篇博客里已经讲述了通过一个自定义 HBase Filter来获取数据的办法,在末尾指出 ...

  8. 数据同步之采用时间戳的方法进行增量数据同步(一)

    本文主要介绍源表为单表时,增量数据同步的情况.当源表为多表时,后面的文章会继续介绍. 一.数据同步情况说明 将源数据库S中的A表(将此表称为源表),通过ETL工具同步至目标数据库T的A表(将此表称为目 ...

  9. hbase原理架构总结

    什么是Hbase Hbase是一个高可靠.高性能.面向列.可伸缩的分布式存储系统,利用Hbase技术可在廉价的PC Server上搭建大规模结构化存储集群. 利用Hadoop HDFS作为其文件存储系 ...

最新文章

  1. Oracle SQL Optimizer IN VS Exists Again
  2. 第 3 章 镜像 - 014 - 镜像的缓存特性
  3. openstack-r版(rocky)搭建基于centos7.4 的openstack swift对象存储服务 四
  4. Nginx+httpd反代实现动静分离
  5. spring shell_Spring Shell项目发布
  6. wordpress致命错误怎么解决_pppoe错误是什么意思 pppoe错误怎么解决
  7. c++ 线程间通信方式
  8. C/C++:Winsock网络编程—ping命令的简单实现
  9. C语言 strspn函数实现
  10. 学习hadoop需要具备基础知识
  11. 移动技术--从网页游戏谈起1--网页游戏的兴起和现状
  12. PCB线路板上的电子元件你认识多少?
  13. 《你要如何衡量你的人生》书籍读后感
  14. 项目经验介绍的STAR法则--面试
  15. 领扣LintCode算法问题答案-983. 棒球游戏
  16. Ubuntu连接WiFi开热点
  17. 安装miktex+winedit
  18. 2022 年前面试总结与感悟分享
  19. 靖哥哥教你一步一步安装redis监控redis-stat-超详细
  20. 曾维沛云推广:全网落地营销为广西南宁企业带来精准客户订单

热门文章

  1. java程序员几大成长法则!
  2. 牛客网习题之牛妹的蛋糕
  3. 用Weex实现新闻类app详情页是怎样一种体验?
  4. 让你思维洞开的5个头脑风暴工具
  5. 百度云 api java_如何使用百度云API接口
  6. AI学习者必备 | 圣母大学公开统计计算课程讲义(视频+PPT+作业)
  7. 全新的 React 组件设计理念 Headless UI
  8. Synchro Arts 三款软件的对比
  9. 2016年国内云主机十大优势总结
  10. 陈伟视频16~19(VB中的窗体)