先把flume1.7的源码包下载

http://archive.apache.org/dist/flume/1.7.0/

下载解压后

我们通过IDEA这个软件来打开这个工程

点击ok后我们选择打开一个新的窗口

不过这个默认方式导入加载时间很长,建议大家用maven方式导入。

导入之后我们看这个类

看看我们的数据源,就是我们之前下载好的搜狗实验室的数据,之前已经上传到节点1去了

这个是我们要配置flume的模型

下面我们来配置节点1的flume

配置jdk的绝对路径

下面这个配置暂时这样配置先,往后可能会修改

下面对下载好的数据进行预处理一下,因为下载下来的数据格式比较混乱

先是按行来把制表符换成逗号,然后生成weblog2.log

接下来是按行把空格换成逗号生成weblog3.log

这样子我们就统一用逗号隔开了

把没用的文件删除掉

改下名字

把预处理完的weblog.log文件分发到节点2 和节点3上去

我们对导入的flume源码进行二次开发

我们不要动他原来的,我们新建一个类

然后把这个类的内容拷过来然后修改文件名和类名

package org.apache.flume.sink.hbase;/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied.  See the License for the* specific language governing permissions and limitations* under the License.*/import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;import java.util.ArrayList;
import java.util.List;
//package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;import java.util.ArrayList;
import java.util.List;/*** A simple serializer to be used with the AsyncHBaseSink* that returns puts from an event, by writing the event* body into it. The headers are discarded. It also updates a row in hbase* which acts as an event counter.** Takes optional parameters:<p>* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>** Mandatory parameters: <p>* <tt>cf:</tt>Column family.<p>* Components that have no defaults and will not be used if absent:* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,* event data will not be written.<p>* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it*  means no column is incremented.*/
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;private byte[] cf;private byte[] payload;private byte[] payloadColumn;private byte[] incrementColumn;private String rowPrefix;private byte[] incrementRow;private SimpleHbaseEventSerializer.KeyType keyType;@Overridepublic void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf;}@Overridepublic List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {switch (keyType) {case TS:rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);break;case TSNANO:rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);break;case RANDOM:rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);break;default:rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);break;}PutRequest putRequest =  new PutRequest(table, rowKey, cf,payloadColumn, payload);actions.add(putRequest);} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}public List<AtomicIncrementRequest> getIncrements() {List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub
}@Overridepublic void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowPrefix = context.getString("rowPrefix", "default");String suffix = context.getString("suffix", "uuid");if (pCol != null && !pCol.isEmpty()) {if (suffix.equals("timestamp")) {keyType = SimpleHbaseEventSerializer.KeyType.TS;} else if (suffix.equals("random")) {keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;} else if (suffix.equals("nano")) {keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;} else {keyType = SimpleHbaseEventSerializer.KeyType.UUID;}payloadColumn = pCol.getBytes(Charsets.UTF_8);}if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void setEvent(Event event) {this.payload = event.getBody();}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub
    }}

在原来基础上稍微做修改

 

按住ctrl键单机鼠标进去

添加以下内容

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied.  See the License for the* specific language governing permissions and limitations* under the License.*/
package org.apache.flume.sink.hbase;import java.io.UnsupportedEncodingException;
import java.util.Random;
import java.util.UUID;/*** Utility class for users to generate their own keys. Any key can be used,* this is just a utility that provides a set of simple keys.*/
public class SimpleRowKeyGenerator {public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");}public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");}public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");}public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");}public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException {return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");}}

继续修改,修改后的代码是下面的

 KfkAsyncHbaseEventSerializer.java
package org.apache.flume.sink.hbase;/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing,* software distributed under the License is distributed on an* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY* KIND, either express or implied.  See the License for the* specific language governing permissions and limitations* under the License.*/import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;import java.util.ArrayList;
import java.util.List;
//package org.apache.flume.sink.hbase;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.conf.ComponentConfiguration;
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.PutRequest;import java.util.ArrayList;
import java.util.List;/*** A simple serializer to be used with the AsyncHBaseSink* that returns puts from an event, by writing the event* body into it. The headers are discarded. It also updates a row in hbase* which acts as an event counter.** Takes optional parameters:<p>* <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>* <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>* <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>** Mandatory parameters: <p>* <tt>cf:</tt>Column family.<p>* Components that have no defaults and will not be used if absent:* <tt>payloadColumn:</tt> Which column to put payload in. If it is not present,* event data will not be written.<p>* <tt>incrementColumn:</tt> Which column to increment. If this is absent, it*  means no column is incremented.*/
public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {private byte[] table;private byte[] cf;private byte[] payload;private byte[] payloadColumn;private byte[] incrementColumn;private String rowPrefix;private byte[] incrementRow;private SimpleHbaseEventSerializer.KeyType keyType;@Overridepublic void initialize(byte[] table, byte[] cf) {this.table = table;this.cf = cf;}@Overridepublic List<PutRequest> getActions() {List<PutRequest> actions = new ArrayList<PutRequest>();if (payloadColumn != null) {byte[] rowKey;try {String [] columns =String.valueOf(payloadColumn).split(",");String [] values =String.valueOf(this.payload).split(",");for(int i=0;i<columns.length;i++) {byte[] colColumn=columns[i].getBytes();byte[] colValue=values[i].getBytes(Charsets.UTF_8);if(colColumn.length!=colValue.length)  break;  //continue;// if(colValue.length<3) continue;String datetime = values[0].toString();String userid = values[1].toString();rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);//获取6个列的值最终加载到hbasePutRequest putRequest = new PutRequest(table, rowKey, cf,colColumn, colValue);actions.add(putRequest);}} catch (Exception e) {throw new FlumeException("Could not get row key!", e);}}return actions;}public List<AtomicIncrementRequest> getIncrements() {List<AtomicIncrementRequest> actions = new ArrayList<AtomicIncrementRequest>();if (incrementColumn != null) {AtomicIncrementRequest inc = new AtomicIncrementRequest(table,incrementRow, cf, incrementColumn);actions.add(inc);}return actions;}@Overridepublic void cleanUp() {// TODO Auto-generated method stub
}@Overridepublic void configure(Context context) {String pCol = context.getString("payloadColumn", "pCol");String iCol = context.getString("incrementColumn", "iCol");rowPrefix = context.getString("rowPrefix", "default");String suffix = context.getString("suffix", "uuid");if (pCol != null && !pCol.isEmpty()) {if (suffix.equals("timestamp")) {keyType = SimpleHbaseEventSerializer.KeyType.TS;} else if (suffix.equals("random")) {keyType = SimpleHbaseEventSerializer.KeyType.RANDOM;} else if (suffix.equals("nano")) {keyType = SimpleHbaseEventSerializer.KeyType.TSNANO;} else {keyType = SimpleHbaseEventSerializer.KeyType.UUID;}payloadColumn = pCol.getBytes(Charsets.UTF_8);}if (iCol != null && !iCol.isEmpty()) {incrementColumn = iCol.getBytes(Charsets.UTF_8);}incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);}@Overridepublic void setEvent(Event event) {this.payload = event.getBody();}@Overridepublic void configure(ComponentConfiguration conf) {// TODO Auto-generated method stub
    }}


现在把代码打包

我们可以看到有很多相关的依赖包,我们把不需要的删掉

直接点击Build就可以了

打好的架包在本地的工程路径的这里

现在把这个架包上传到flume的lib目录下

也就是这个目录。

可以看到上传日期,就是今天上传的

下面配置flume + kafka

agent1.sources = r1
agent1.channels = kafkaC hbaseC
agent1.sinks=kafkaSink hbaseSink#***********flume + hbase************
agent1.sources.r1.type = avro
agent1.sources.r1.channels = hbaseC
agent1.sources.r1.bind = bigdata-pro01.kfk.com
agent1.sources.r1.port=5555
agent1.sources.r1.threads=5agent1.channels.hbaseC.type = memory
agent1.channels.hbaseC.capacity = 100000
agent1.channels.hbaseC.transactionCapacity = 100000
agent1.channels.hbaseC.keep-alive=20agent1.sinks.hbaseSink.type = asynchbase
agent1.sinks.hbaseSink.table=weblogs
agent1.sinks.hbaseSink.columnFamily=info
agent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
agent1.sinks.hbaseSink.channel = hbaseC
agent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl#**************flume + kafka***************
agent1.channels.kafkaC.type = memory
agent1.channels.kafkaC.capacity = 100000
agent1.channels.kafkaC.transactionCapacity = 100000
agent1.channels.kafkaC.keep-alive=20agent1.sinks.kafkaSink.channel = kafkaC
agent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092
agent1.sinks.kafkaSink.topic=test
agent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181
agent1.sinks.kafkaSink.requiredAcks=1
agent1.sinks.kafkaSink.batchSize=1
agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder

转载于:https://www.cnblogs.com/braveym/p/8320904.html

Flume+HBase+Kafka集成与开发相关推荐

  1. [大数据] 搜索日志数据采集系统 flume+hbase+kafka架构 (数据搜狗实验室)

    1 采集规划 说明: D1 日志所在服务器1 -bigdata02.com D2 日志所在服务器2 -bigdata03.com A flume2 - bigdata02.com 日志收集 C flu ...

  2. 大数据开发超高频面试题!大厂面试必看!包含Hadoop、zookeeper、Hive、flume、kafka、Hbase、flink、spark、数仓等

    大数据开发面试题 包含Hadoop.zookeeper.Hive.flume.kafka.Hbase.flink.spark.数仓等高频面试题. 数据来自原博主爬虫获取! 文章目录 大数据开发面试题 ...

  3. 电信CALL 通话记录hbase kafka flume 学习

    可视化: --------------     1.     2.     3.     4.     5. package com.it18zhang.callloggen; import java ...

  4. 数据采集组件:Flume基础用法和Kafka集成

    本文源码:GitHub || GitEE 一.Flume简介 1.基础描述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中 ...

  5. flume=》kafka=》hbase

    flume=>kafka=>hbase ps:无说明流程(未完成) 环境准备: 1.启动hadoop:start-all.sh 2.启动zookeeper:zkServer.sh star ...

  6. SparkStreaming+kafka+flume+hbase日志实时流处理项目

    1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...

  7. 大数据流处理:Flume、Kafka和NiFi对比

    在构建大数据流水线时,我们需要考虑处理数据的数量,种类和速度,这些数据通常出现在Hadoop生态系统的入口.在决定采用哪种工具来满足我们的要求时,都会考虑到可扩展性.可靠性.适应性.开发时间方面的成本 ...

  8. kafka数据到flume_大数据摄取:Flume,Kafka和NiFi

    kafka数据到flume 初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适 ...

  9. 大数据摄取:Flume,Kafka和NiFi

    初赛 在构建大数据管道时,我们需要考虑如何吸收出现在通常是Hadoop生态系统大门口的数据量,多样性和速度. 在决定采用哪种工具来满足我们的要求时,诸如可伸缩性,可靠性,适应性,开发时间成本等方面的初 ...

最新文章

  1. Active Directory管理之十一:升级Active Directory(上)
  2. SpringMVC接收checkbox传值
  3. Microsoft Azure云服务停机!系水泵未知原因关闭导致
  4. Thymeleaf选择器引用公共片段
  5. Spring Cloud Gateway
  6. Linux复习资料——MySQL-client-5.6.50-1.el7.x86_64与MySQL-server-5.6.50-1.el7.x86_64包安装MySQL全过程
  7. Delphi XE2 之 FireMonkey 入门(18) - TLang(多语言切换的实现)
  8. 田沄(1980-),男,博士,中国工程院—清华大学联合博士后科研工作站博士后,北京师范大学副教授....
  9. 安装AdventureWorks2008R2示例数据库
  10. 安装vs 2015 社区版
  11. python网络数据采集 第二版_Python网络数据采集 (影印版)第2版
  12. TYUT太原理工大学2022需求工程考试选择题背诵版
  13. java 格林尼治生僻时间转换
  14. ICPCCamp 2016 Day 6 - Spb SU and Spb AU Contest(Colored path-dp)
  15. 拒绝男程序员?程序员就活该接盘?
  16. 本地生活O2O行业已经逐渐渗透到日常生活中
  17. 用计算机演奏歌曲谱子,我要用计算器弹曲子,求亲们发点计算器简谱
  18. 微软应用商店打不开代码: 0x80131500
  19. bp神经网络缺点及克服,bp神经网络存在的问题
  20. qpainter可以设置某像素点的颜色吗_你知道人类眼球的内在价值高达35亿美金吗?...

热门文章

  1. Java设计模式5:原型模式
  2. [转载]linux 出现: Starting MySQL.Manager of pid-file quit without updating file.[FAILED] 已解决...
  3. sql单表简单的分页脚本
  4. Vue实践--v-model实现简易计算器
  5. JavaJDK8新特性相关知识整理
  6. Redis学习(6)-常用命令
  7. POJ 2433 枚举
  8. Real-Time Rendering 3 彩图
  9. WIndows thinpc 精简版的WIN7
  10. Spring开发--Bean配置实例讲解