1.数据生产

使用java代码往一个文件中写入数据

package com.mobile;import java.io.*;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;/*** @author kaiya* @Desc 数据生产* @date 2020/5/6 20:33*/
public class Producer {// 数据要求:caller,callee,buildTime,duration 主叫,被叫,通话建立时间,通话持续时间// 电话号码List<String> phoneNumList = new ArrayList<>();// 电话号码,姓名Map<String, String> phoneNameMap = new HashMap<>();private String startTime = "2020-01-01";private String endTime = "2020-12-31";/*** caller :20个电话号码用List存储,Math.random()*20随机产生0-19的随机数,* 去获取随机的号码,这里获取的随机数箱单与存放号码的索引*//*** 数据初始化*/public void initPhone() {// 电话号码phoneNumList.add("17078388295");phoneNumList.add("13980337439");phoneNumList.add("14575535933");phoneNumList.add("19902496992");phoneNumList.add("18549641558");phoneNumList.add("17005930322");phoneNumList.add("18468618874");phoneNumList.add("18576581848");phoneNumList.add("15978226424");phoneNumList.add("15542823911");phoneNumList.add("17526304161");phoneNumList.add("15422018558");phoneNumList.add("17269452013");phoneNumList.add("17764278604");phoneNumList.add("15711910344");phoneNumList.add("15714728273");phoneNumList.add("16061028454");phoneNumList.add("16264433631");phoneNumList.add("17601615878");phoneNumList.add("15897468949");// 电话号码,姓名phoneNameMap.put("17078388295", "施耐庵");phoneNameMap.put("13980337439", "李世民");phoneNameMap.put("14575535933", "程咬金");phoneNameMap.put("19902496992", "猪八戒");phoneNameMap.put("18549641558", "孙悟空");phoneNameMap.put("17005930322", "唐三藏");phoneNameMap.put("18468618874", "沙僧");phoneNameMap.put("18576581848", "沙悟净");phoneNameMap.put("15978226424", "猪悟能");phoneNameMap.put("15542823911", "观世音");phoneNameMap.put("17526304161", "太白金星");phoneNameMap.put("15422018558", "赤脚大仙");phoneNameMap.put("17269452013", "二郎神");phoneNameMap.put("17764278604", "哮天犬");phoneNameMap.put("15711910344", "嫦娥");phoneNameMap.put("15714728273", "玉皇大帝");phoneNameMap.put("16061028454", "王母娘娘");phoneNameMap.put("16264433631", "如来");phoneNameMap.put("17601615878", "白骨精");phoneNameMap.put("15897468949", "牛魔王");}/*** 生产并返回数据* 数据要求:caller,callee,buildTime,duration 主叫,被叫,通话建立时间,通话持续时间*/public String product() {/*** caller :随机从电话号码中取出一个作为主叫*/int callerIndex = (int)(Math.random() * phoneNumList.size());String caller = phoneNumList.get(callerIndex);/*** callee :随机从电话号码中取出一个作为被叫*/String callee = null;while (true) {int calleeIndex = (int)(Math.random() * phoneNumList.size());callee = phoneNumList.get(calleeIndex);// 需要主叫和被叫不是同一个号码if (!callee.equals(caller)) {break;}}/*** buildTime*/String buildTime = randomBuildTime(startTime, endTime);/*** duration*/DecimalFormat df = new DecimalFormat("0000");String duration = df.format((int)30 * 60 * Math.random());// 拼接最终数据StringBuffer sb = new StringBuffer();sb.append(caller).append(",").append(callee).append(",").append(buildTime).append(",").append(duration);return sb.toString();}/*** 返回一个介于开始时间和结束时间之间的时间* @param startTime 开始时间 yyyy-MM-dd* @param endTime 结束时间 yyyy-MM-dd* @return*/private String randomBuildTime(String startTime, String endTime) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date startDate = sdf.parse(startTime);Date endDate = sdf.parse(endTime);// 结束时间 > 开始时间if (startDate.getTime() >= endDate.getTime()) {return null;}// 获取一个随机时间  (结束时间-开始时间)*随机数+起始时间Long timeMill = (long)((endDate.getTime() - startDate.getTime())* Math.random() + startDate.getTime());Date date = new Date(timeMill);return sdf2.format(date);} catch (ParseException e) {e.printStackTrace();}return null;}/*** 将数据写出到文件* @param filePath*/public  void writeToFile(String filePath) {OutputStreamWriter osw = null;try {osw = new OutputStreamWriter(new FileOutputStream(new File(filePath), true), "UTF-8");while (true) {// 每0.5s写一次Thread.sleep(500);String data = product();System.out.println(data);osw.write(data + "\n");osw.flush();}} catch (UnsupportedEncodingException e) {e.printStackTrace();} catch (FileNotFoundException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} finally {if (osw != null) {try {osw.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Producer producer = new Producer();// 初始化时间producer.initPhone();producer.writeToFile(args[0]);}
}

2.flume+kafka消费数据【flume与kafka的搭建在此省略】

2.1 kafka消费数据代码

maven依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.2</version></dependency>

消费代码 【2.4】

package com.bigdata.kafka;import com.bigdata.hbase.HBaseDAO;
import com.bigdata.utils.PropertiesUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;
import java.util.Properties;/*** @author kaiya* @Desc kafka消费数据写入Hbase* @date 2020/5/10 12:22*/
public class Kafka2HBase {public static void main(String[] args) {// kafka 消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(PropertiesUtils.properties);// kafka订阅【subscribe】主题【Topic】kafkaConsumer.subscribe(Arrays.asList(PropertiesUtils.getProperty("kafka.topics")));// 拉取数据while (true) {ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> consumeRecord : poll) {//  14575535933,15422018558,2020-01-25 22:41:22,1551String line = consumeRecord.value();System.out.println(line);}}}
}

配置文件 properties

# 设置kafka的brokerlist
bootstrap.servers=bigdata111:9092,bigdata112:9092,bigdata113:9092
# 设置消费者所属的消费组
group.id=hbase_consumer_group
# 设置是否自动确认offset
enable.auto.commit=true
# 自动确认offset的时间间隔
auto.commit.interval.ms=30000
# 设置key,value的反序列化类的全名
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer# 以下为自定义属性设置
# 设置本次消费的主题
kafka.topics=calllog

读取配置文件

package com.bigdata.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;/*** @author kaiya* @Desc 读取配置文件的工具类* @date 2020/5/8 20:51*/public class PropertiesUtils {public static Properties properties = null;private static final Logger LOG = LoggerFactory.getLogger(PropertiesUtils.class);static {// 读取hbase的配置文件InputStream is = ClassLoader.getSystemResourceAsStream("hbase.properties");properties = new Properties();try {// 装载文件properties.load(is);} catch (IOException e) {e.printStackTrace();}}/*** 获取配置文件中key对应的值* @param key* @return*/public static String getProperty(String key) {return properties.getProperty(key);}
}

2.2 生产数据

将数据生产的代码用maven打包成jar包,上传到liunx环境,运行jar包生产数据

java -cp Producer-1.0-SNAPSHOT.jar com.mobile.Producer /opt/datas/calllog.csv

2.3 flume拉取数据到kafka

配置文件

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
#监控的文件,及数据生产的文件
a1.sources.r1.command = tail -F -c +0 /opt/datas/calllog.csv
a1.sources.r1.shell = /bin/bash -c# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

为了方便,直接保存成脚本

#!/bin/bash
/opt/module/flume-1.8.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.8.0/conf/ -f /opt/module/flume-1.8.0/jobconf/flume-to_kafka.conf

2.4运行kafka消费数据代码,由于我把消费到的数据打印到控制台,可以直接看到数据

flume+kafka消费数据【纯个人笔记】相关推荐

  1. sparkstreaming直接从kafka消费数据

    1.sparkstreaming直接从kafka消费数据 采用createDirectStream,示例: createDirectStream[K, V, KD <: Decoder[K],  ...

  2. 【kafka】kafka 消费数据的时候 报错 (Re-) join group

    文章目录 1.场景1 1.1 概述 2.场景2 3.场景3 1.场景1 1.1 概述 kafka 消费数据的时候 报错 如下 2.场景2 spirng-kafka的多consumer问题困扰了我好久, ...

  3. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  4. HBase项目实战:HBase+Flume+Kafka+Hive+SSM实现电信大数据通话信息实时读写定位系统

    内容简介 一.项目内容深度分析 1. 项目内容概览 2.数据的大致流向分析 3. 涉及的知识难点分析 二.项目所用到的框架清单 三.项目实战代码 1. 后端开发 1. 构建工程项目模块 2.开发通话记 ...

  5. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  6. flume+kafka整合采集数据案例

    一.flume简介 1.1.1 概述 Flume是一个分布式.可靠.和高可用的海量日志采集.聚合和传输的系统. Flume可以采集文件,socket数据包.文件.文件夹.kafka等各种形式源数据,又 ...

  7. 离线分析:Flume+Kafka+HBase+Hadoop通话数据统计

    文章目录 项目背景 项目架构 系统环境 系统配置 框架安装 JDK Hadoop Zookeeper Kafka Flume HBase 项目实现 项目结构 表设计 HBase Mysql 功能编写 ...

  8. Kafka消费者不消费数据

    背景: 工作往往是千篇一律,真正能学到点知识都是在上线后.使用Skywalking+Kafka+ES进行应用监控. 现象: 公司使用Skywalking在开发测试环境中Kafka顺利消费数据,到了UA ...

  9. Flume+Kafka双剑合璧玩转大数据平台日志采集

    点击上方蓝色字体,选择"设为星标" 回复"资源"获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好 ...

最新文章

  1. 解决数据库自增ID的问题
  2. OSPF(Open Shortest Path First开放式最短路径优先)
  3. f12获取网页文本_怎么获取网页源代码中的文件?
  4. Docker selenium自动化 - 使用python操作docker,python运行、启用、停用和查询容器实例演示
  5. [LevelDB] 写批处理过程详解
  6. java外部接口图解_java代码实现访问网络外部接口并获取数据的工具类详解
  7. 21 SD配置-主数据-客户账户组分配编号范围
  8. Zabbix通过Smokeping检测网络质量并告警
  9. windows和Linux虚拟机或者云主机之间传输文件
  10. 博文视点OpenParty第11期“世界黑客大会那些事儿”成功举办
  11. 如何在Mac上删除其他存储
  12. grub的boot loader安装在磁盘上的位置
  13. golang echo框架案例
  14. CCNA学习指南第三章
  15. Vmprotect 驱动加壳踩坑
  16. python绘制彩色地震剖面断层解释_地震剖面上的断层分析及相关意义
  17. 熟读100句英文,记7000单词
  18. 使用el-tag文字过长超出隐藏
  19. 基于JAVA的农产品销售管理系统【数据库设计、源码、开题报告】
  20. jeecgboot:设置为缓存路由,切换页面,保留数据

热门文章

  1. PHP:通过反射ReflectionClass获取类中的所有常量
  2. 通过FPDF创建中文PDF文档
  3. java如何实现音乐播放
  4. Day10 面向对象 、类和对象的关系
  5. 摩托罗拉里程碑1刷机教程
  6. jQuery 百度地图单点标注 公司简介联系我们 实例
  7. 永恒之蓝(EternalBlue)实例复现
  8. 微信显示时间php,类似于微信的聊天时间显示
  9. WINDOWS中无法查看显示隐藏文件的解决方法
  10. 2021辽宁高考成绩查询具体时间,2021年辽宁高考成绩什么时候出具体时间几点 具体准确时间...