Flume+HBase+Kafka集成与开发
先把flume1.7的源码包下载
http://archive.apache.org/dist/flume/1.7.0/
下载解压后
我们通过IDEA这个软件来打开这个工程
点击ok后我们选择打开一个新的窗口
不过这个默认方式导入加载时间很长,建议大家用maven方式导入。
导入之后我们看这个类
看看我们的数据源,就是我们之前下载好的搜狗实验室的数据,之前已经上传到节点1去了
这个是我们要配置flume的模型
下面我们来配置节点1的flume
配置jdk的绝对路径
下面这个配置暂时这样配置先,往后可能会修改
下面对下载好的数据进行预处理一下,因为下载下来的数据格式比较混乱
先是按行来把制表符换成逗号,然后生成weblog2.log
接下来是按行把空格换成逗号生成weblog3.log
这样子我们就统一用逗号隔开了
把没用的文件删除掉
改下名字
把预处理完的weblog.log文件分发到节点2 和节点3上去
我们对导入的flume源码进行二次开发
我们不要动他原来的,我们新建一个类
然后把这个类的内容拷过来然后修改文件名和类名
package org.apache.flume.sink.hbase;/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest;import java.util.ArrayList; import java.util.List; //package org.apache.flume.sink.hbase; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest;import java.util.ArrayList; import java.util.List;/*** A simple serializer to be used with the AsyncHBaseSink* that returns puts from an event, by writing the event* body into it. The headers are discarded. It also updates a row in hbase* which acts as an event counter.** Takes optional parameters:<p>* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>** Mandatory parameters: <p>* <tt>cf:</tt>Column family.<p>* Components that have no defaults and will not be used if absent:* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,* event data will not be written.<p>* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it* means no column is incremented.*/ public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;private byte[] cf;private byte[] payload;private byte[] payloadColumn;private byte[] incrementColumn;private String rowPrefix;private byte[] incrementRow;private SimpleHbaseEventSerializer.KeyType keyType;@Overridepublic void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf;}@Overridepublic List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {switch (keyType) {case TS:rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);break;case TSNANO:rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);break;case RANDOM:rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);break;default:rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);break;}PutRequest putRequest = new PutRequest(table, rowKey, cf,payloadColumn, payload);actions.add(putRequest);} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}public List<AtomicIncrementRequest> getIncrements() {List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub }@Overridepublic void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowPrefix = context.getString("rowPrefix", "default");String suffix = context.getString("suffix", "uuid");if (pCol != null && !pCol.isEmpty()) {if (suffix.equals("timestamp")) {keyType = SimpleHbaseEventSerializer.KeyType.TS;} else if (suffix.equals("random")) {keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;} else if (suffix.equals("nano")) {keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;} else {keyType = SimpleHbaseEventSerializer.KeyType.UUID;}payloadColumn = pCol.getBytes(Charsets.UTF_8);}if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void setEvent(Event event) {this.payload = event.getBody();}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub }}
在原来基础上稍微做修改
按住ctrl键单机鼠标进去
添加以下内容
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/ package org.apache.flume.sink.hbase;import java.io.UnsupportedEncodingException; import java.util.Random; import java.util.UUID;/*** Utility class for users to generate their own keys. Any key can be used,* this is just a utility that provides a set of simple keys.*/ public class SimpleRowKeyGenerator {public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");}public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");}public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");}public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");}public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException {return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");}}
继续修改,修改后的代码是下面的
KfkAsyncHbaseEventSerializer.java
package org.apache.flume.sink.hbase;/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied. See the License for the* specific language governing permissions and limitations* under the License.*/import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest;import java.util.ArrayList; import java.util.List; //package org.apache.flume.sink.hbase; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; import org.hbase.async.AtomicIncrementRequest; import org.hbase.async.PutRequest;import java.util.ArrayList; import java.util.List;/*** A simple serializer to be used with the AsyncHBaseSink* that returns puts from an event, by writing the event* body into it. The headers are discarded. It also updates a row in hbase* which acts as an event counter.** Takes optional parameters:<p>* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>** Mandatory parameters: <p>* <tt>cf:</tt>Column family.<p>* Components that have no defaults and will not be used if absent:* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,* event data will not be written.<p>* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it* means no column is incremented.*/ public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;private byte[] cf;private byte[] payload;private byte[] payloadColumn;private byte[] incrementColumn;private String rowPrefix;private byte[] incrementRow;private SimpleHbaseEventSerializer.KeyType keyType;@Overridepublic void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf;}@Overridepublic List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {String [] columns =String.valueOf(payloadColumn).split(",");String [] values =String.valueOf(this.payload).split(",");for(int i=0;i<columns.length;i++) {byte[] colColumn=columns[i].getBytes();byte[] colValue=values[i].getBytes(Charsets.UTF_8);if(colColumn.length!=colValue.length) break; //continue;// if(colValue.length<3) continue;String datetime = values[0].toString();String userid = values[1].toString();rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);//获取6个列的值最终加载到hbasePutRequest putRequest = new PutRequest(table, rowKey, cf,colColumn, colValue);actions.add(putRequest);}} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}public List<AtomicIncrementRequest> getIncrements() {List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub }@Overridepublic void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowPrefix = context.getString("rowPrefix", "default");String suffix = context.getString("suffix", "uuid");if (pCol != null && !pCol.isEmpty()) {if (suffix.equals("timestamp")) {keyType = SimpleHbaseEventSerializer.KeyType.TS;} else if (suffix.equals("random")) {keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;} else if (suffix.equals("nano")) {keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;} else {keyType = SimpleHbaseEventSerializer.KeyType.UUID;}payloadColumn = pCol.getBytes(Charsets.UTF_8);}if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void setEvent(Event event) {this.payload = event.getBody();}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub }}
现在把代码打包
我们可以看到有很多相关的依赖包,我们把不需要的删掉
直接点击Build就可以了
打好的架包在本地的工程路径的这里
现在把这个架包上传到flume的lib目录下
也就是这个目录。
可以看到上传日期,就是今天上传的
下面配置flume + kafka
agent1.sources = r1 agent1.channels = kafkaC hbaseC agent1.sinks=kafkaSink hbaseSink#***********flume + hbase************ agent1.sources.r1.type = avro agent1.sources.r1.channels = hbaseC agent1.sources.r1.bind = bigdata-pro01.kfk.com agent1.sources.r1.port=5555 agent1.sources.r1.threads=5agent1.channels.hbaseC.type = memory agent1.channels.hbaseC.capacity = 100000 agent1.channels.hbaseC.transactionCapacity = 100000 agent1.channels.hbaseC.keep-alive=20agent1.sinks.hbaseSink.type = asynchbase agent1.sinks.hbaseSink.table=weblogs agent1.sinks.hbaseSink.columnFamily=info agent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer agent1.sinks.hbaseSink.channel = hbaseC agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl#**************flume + kafka*************** agent1.channels.kafkaC.type = memory agent1.channels.kafkaC.capacity = 100000 agent1.channels.kafkaC.transactionCapacity = 100000 agent1.channels.kafkaC.keep-alive=20agent1.sinks.kafkaSink.channel = kafkaC agent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092 agent1.sinks.kafkaSink.topic=test agent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 agent1.sinks.kafkaSink.requiredAcks=1 agent1.sinks.kafkaSink.batchSize=1 agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
转载于:https://www.cnblogs.com/braveym/p/8320904.html
Flume+HBase+Kafka集成与开发相关推荐
- [大数据] 搜索日志数据采集系统 flume+hbase+kafka架构 (数据搜狗实验室)
1 采集规划 说明: D1 日志所在服务器1 -bigdata02.com D2 日志所在服务器2 -bigdata03.com A flume2 - bigdata02.com 日志收集 C flu ...
- 大数据开发超高频面试题!大厂面试必看!包含Hadoop、zookeeper、Hive、flume、kafka、Hbase、flink、spark、数仓等
大数据开发面试题 包含Hadoop.zookeeper.Hive.flume.kafka.Hbase.flink.spark.数仓等高频面试题. 数据来自原博主爬虫获取! 文章目录 大数据开发面试题 ...
- 电信CALL 通话记录hbase kafka flume 学习
可视化: -------------- 1. 2. 3. 4. 5. package com.it18zhang.callloggen; import java ...
- 数据采集组件:Flume基础用法和Kafka集成
本文源码:GitHub || GitEE 一.Flume简介 1.基础描述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中 ...
- flume=》kafka=》hbase
flume=>kafka=>hbase ps:无说明流程(未完成) 环境准备: 1.启动hadoop:start-all.sh 2.启动zookeeper:zkServer.sh star ...
- SparkStreaming+kafka+flume+hbase日志实时流处理项目
1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...
- 大数据流处理:Flume、Kafka和NiFi对比
在构建大数据流水线时,我们需要考虑处理数据的数量,种类和速度,这些数据通常出现在Hadoop生态系统的入口.在决定采用哪种工具来满足我们的要求时,都会考虑到可扩展性.可靠性.适应性.开发时间方面的成本 ...
- kafka数据到flume_大数据摄取:Flume,Kafka和NiFi
kafka数据到flume 初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适 ...
- 大数据摄取:Flume,Kafka和NiFi
初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适应性,开发时间成本等方面的初 ...
最新文章
- Active Directory管理之十一:升级Active Directory(上)
- SpringMVC接收checkbox传值
- Microsoft Azure云服务停机!系水泵未知原因关闭导致
- Thymeleaf选择器引用公共片段
- Spring Cloud Gateway
- Linux复习资料——MySQL-client-5.6.50-1.el7.x86_64与MySQL-server-5.6.50-1.el7.x86_64包安装MySQL全过程
- Delphi XE2 之 FireMonkey 入门(18) - TLang(多语言切换的实现)
- 田沄(1980-),男,博士,中国工程院—清华大学联合博士后科研工作站博士后,北京师范大学副教授....
- 安装AdventureWorks2008R2示例数据库
- 安装vs 2015 社区版
- python网络数据采集 第二版_Python网络数据采集 (影印版)第2版
- TYUT太原理工大学2022需求工程考试选择题背诵版
- java 格林尼治生僻时间转换
- ICPCCamp 2016 Day 6 - Spb SU and Spb AU Contest(Colored path-dp)
- 拒绝男程序员?程序员就活该接盘?
- 本地生活O2O行业已经逐渐渗透到日常生活中
- 用计算机演奏歌曲谱子,我要用计算器弹曲子,求亲们发点计算器简谱
- 微软应用商店打不开代码: 0x80131500
- bp神经网络缺点及克服,bp神经网络存在的问题
- qpainter可以设置某像素点的颜色吗_你知道人类眼球的内在价值高达35亿美金吗?...
热门文章
- Java设计模式5:原型模式
- [转载]linux 出现: Starting MySQL.Manager of pid-file quit without updating file.[FAILED] 已解决...
- sql单表简单的分页脚本
- Vue实践--v-model实现简易计算器
- JavaJDK8新特性相关知识整理
- Redis学习(6)-常用命令
- POJ 2433 枚举
- Real-Time Rendering 3 彩图
- WIndows thinpc 精简版的WIN7
- Spring开发--Bean配置实例讲解