一、在kafka目录下创建topic

cd /usr/hdp/current/kafka-broker/

这里一共创建了八个topic(一个数据文件创建一个topic)

(user-topic)
(1)在kafka目录下执行以下代码来创建topic

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic users --partitions 1 --replication-factor 1

(2)设置users–topic的消息保存时间为一星期

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic user --config retention.ms=604800000

其他七个topic操作类似,代码如下
(user_friends -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic user_friends --partitions 3 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic user_friends --config retention.ms=604800000

(events -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic events --partitions 3 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic events --config retention.ms=604800000

(event_attendees_raw -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic event_attendees_raw --partitions 1 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic event_attendees_raw --config retention.ms=604800000

(event_attendees -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic event_attendees --partitions 3 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic event_attendees --config retention.ms=604800000

(train -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic train --partitions 1 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic train --config retention.ms=604800000

(test -topic)

bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --create --topic test --partitions 1 --replication-factor 1bin/kafka-topics.sh --zookeeper sandbox-hdp.hortonworks.com:2181 --alter --topic test --config retention.ms=604800000

可以通过如下命令查看topic中的每一个partition中的消息数量

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list sandbox-hdp.hortonworks.com:6667 --topic users -time -1 --offsets 1


二、创建目录

mkdir -p /var/flume/checkpoint/users
mkdir -p /var/flume/data/users
chmod 777 -R /var/flume
mkdir -p /events/input/intra/users
chmod 777 -R /events/

三、在flume中创建agent

(1)在如下位置编写配置文件

(2)配置代码如下
#(users agent’s)

#Deploy the following content into Flume
#Initialize agent's source, channel and sink
users.sources = usersSource
users.channels = usersChannel
users.sinks = usersSink#Use a channel which buffes events in a directoryusers.channels.usersChannel.type = file
users.Channels.usersChannel.checkpointDir = /var/flume/checkpoint/users
users.channel.usersChannel.dataDirs = /var/flume/data/users#Setting the source to spool directory where the file existsusers.sources.usersSource.type = spooldir
users.sources.usersSource.deserializer = LINE
users.sources.usersSource.deserializer.maxLineLength = 6400
users.sources.usersSource.spoolDir = /events/input/intra/users
users.sources.usersSource.includePattern = users_[0-9][4]-[0-9][2]-[0-9][2].csv
users.sources.usersSource.interceptors = head_filter
users.sources.usersSource.interceptors.head_filter.type = regex_filter
users.sources.usersSource.interceptors.head_filter.regex = ^user_id,locale,birthyear,gender,joinedAt,location,timezone$
users.sources.usersSource.interceptors.head_filter.excludeEvents = true
users.sources.usersSource.channels = usersChannel#Define / Configure.sinkusers.sinks.usersSink.type = org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize = 640
users.sinks.usersSink.brokerList = sandbox-hdp.hortonworks.com:6667
users.sinks.usersSink.topic = users
users.sinks.usersSink.channel = usersChannel

(3)保存后重新启动flume
成功后如下

四、导入文件

在csv文件所在位置自行以下命令

install -m 777 users.csv /events/input/intra/users/users_2021-01-25.csv

可以使用以下命令查看导入状态

 ll /events/input/intra/users/


其余七个不做过多介绍,只放代码

创建目录

mkdir -p /var/flume/checkpoint/events
mkdir -p /var/flume/data/events
chmod 777 -R /var/flume

#(events agent’s)

#Deploy the following content into Flume
#Initialize agent's source,channel and sink
events.sources = eventsSource
events.channels = eventsChannel
events.sinks = eventsSink1 eventsSink2 eventsSink3
events.sinkgroups = grpEvents
events.sinkgroups.grpEvents.sinks = eventsSink1 eventsSink2 eventsSink3
events.sinkgroups.grpEvents.processor.type = load_balance
events.sinkgroups.grpEvents.processor.backoff = true
events.sinkgroups.grpEvents.processor.selector = round_robin#Use a channel which buffers events in a directoryevents.channels.eventsChannel.type = file
events.channels.eventsChannel.checkpointDir = /var/flume/checkpoint/events
events.channels.eventsChannel.dataDirs = /var/flume/data/events
events.channels.eventsChannel.transactionCapacity = 5000#Setting the source to spool directory where the file existsevents.sources.eventsSource.type = spooldir
events.sources.eventsSource.deserializer = LINE
events.sources.eventsSource.deserializer.maxLineLength = 32000
events.sources.eventsSource.spoolDir = /events/input/intra/events
events.sources.eventsSource.includePattern = events_[0-9][4]-[0-9][2]-[0-9][2].csv
events.sources.eventsSource.interceptors = head_filter
events.sources.eventsSource.interceptors.head_filter.type = regex_filter
events.sources.eventsSource.interceptors.head_filter.regex = ^event_id,user_id,start_time,city,state,zip,country,lat,lng,c_1,c_2,c_3,c_4,c_5,c_6,c_7,c_8,c_9,c_10,c_11,c_12,c_13,c_14,c_15,c_16,c_17,c_18,c_19,c_20,c_21,c_22,c_23,c_24,c_25,c_26,c_27,c_28,c_29,c_30,c_31,c_32,c_33,c_34,c_35,c_36,c_37,c_38,c_39,c_40,c_41,c_42,c_43,c_44,c_45,c_46,c_47,c_48,c_49,c_50,c_51,c_52,c_53,c_54,c_55,c_56,c_57,c_58,c_59,c_60,c_61,c_62,c_63,c_64,c_65,c_66,c_67,c_68,c_69,c_70,c_71,c_72,c_73,c_74,c_75,c_76,c_77,c_78,c_79,c_80,c_81,c_82,c_83,c_84,c_85,c_86,c_87,c_88,c_89,c_90,c_91,c_92,c_93,c_94,c_95,c_96,c_97,c_98,c_99,c_100,c_other$
events.sources.eventsSource.interceptors.head_filter.excludeEvents = true
events.sources.eventsSource.channels = eventsChannel#Define / Configure sinks
events.sinks.eventsSink1.type = org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink1.batchSize = 1280
events.sinks.eventsSink1.brokerList = sandbox-hdp.hortonworks.com:6667
events.sinks.eventsSink1.topic = events
events.sinks.eventsSink1.channel = eventsChannel
events.sinks.eventsSink2.type = org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink2.batchSize = 1280
events.sinks.eventsSink2.brokerList = sandbox-hdp.hortonworks.com:6667
events.sinks.eventsSink2.topic = events
events.sinks.eventsSink2.channel = eventsChannel
events.sinks.eventsSink3.type = org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink3.batchSize = 1280
events.sinks.eventsSink3.brokerList = sandbox-hdp.hortonworks.com:6667
events.sinks.eventsSink3.topic = events
events.sinks.eventsSink3.channel = eventsChannel
install -m 777 events.csv /events/input/intra/events/events_2021-01-25.csv

创建目录

mkdir -p /var/flume/checkpoint/train
mkdir -p /var/flume/data/trainhdfs dfs -mkdir -p /user/events/driver

修改目录权限

chmod 777 -R /var/flumehdfs dfs -chmod -R 777 /user/events/driver

#(train agent’s)

#Initialize agent's source ,channel and sinktrain.sources = trainSource
train.channels = trainChannel driverChannel
train.sinks = trainSink driverSink#Use a channel which buffers events in a directorytrain.channels.trainChannel.type = file
train.channels.trainChannel.checkpointDir = /var/flume/checkpoint/train
train.channels.trainChannel.dataDirs = /var/flume/data/train#Setting the channel to memorytrain.channels.driverChannel.type = memory
train.channels.driverChannel.capacity = 64000
train.channels.driverChannel.transactioncapacity = 16000#Setting the source to spool directory where the file existstrain.sources.trainSource.type = spooldir
train.sources.trainSource.deserializer = LINE
train.sources.trainSource.deserializer.maxLineLength = 3200
train.sources.trainSource.spoolDir = /events/input/intra/train
train.sources.trainSource.includePattern = train_[0-9][4]-[0-9][2]-[0-9][2].csv
train.sources.trainSource.interceptors = head_filter
train.sources.trainSource.interceptors.head_filter.type = regex_filter
train.sources.trainSource.interceptors.head_filter.regex = ^user,event,invited,timestamp,interested,not_interested$
train.sources.trainSource.interceptors.head_filter.excludeEvents = true
train.sources.trainSource.channels = trainChannel driverChannel#Dfine / Configure sink train.sinks.trainSink.type = org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize = 640
train.sinks.trainSink.brokerList = sandbox-hdp.hortonworks.com:6667
train.sinks.trainSink.topic = train
train.sinks.trainSink.channel = trainChannel#Setting the sink to HDFStrain.sinks.driverSink.type = hdfs
train.sinks.driverSink.hdfs.fileType = DataStream
train.sinks.driverSink.hdfs.filePrefix = train
train.sinks.driverSink.hdfs.fileSuffix = .csv
train.sinks.driverSink.hdfs.path = /user/events/driver/%Y-%m-%d
train.sinks.driverSink.hdfs.useLocalTimeStamp = true
train.sinks.driverSink.hdfs.batchSize = 6400#Number of events weitten to file before it rolled train.sinks.driverSink.hdfs.rollCount = 3200#File size to trigger roll, in bytestrain.sinks.driverSink.hdfs.rollSize = 640000#Number of seconds to wait before rolling current file train.sinks.driverSink.hdfs.rollInterval = 300
train.sinks.driverSink.channel = driverChannel
install -m 777 train.csv /events/input/intra/train/train_2021-01-25.csv

创建目录

mkdir -p /var/flume/checkpoint/test
mkdir -p /var/flume/data/testchmod 777 -R /var/flume

#(test agent’s)

# Initialize agent's source, channel and sink
test.sources = testSource
test.channels = testChannel
test.sinks = testSink# Use a channel which buffers events in a directory
test.channels.testChannel.type = file
test.channels.testChannel.checkpointDir = /var/flume/checkpoint/test
test.channels.testChannel.dataDirs = /var/flume/data/test# Setting the source to spool directory where the file exists
test.sources.testSource.type = spooldir
test.sources.testSource.deserializer = LINE
test.sources.testSource.deserializer.maxLineLength = 6400
test.sources.testSource.spoolDir = /events/input/intra/test
test.sources.testSource.includePattern = test_[0-9][4]-[0-9][2]-[0-9][2].csv
test.sources.testSource.interceptors = head_filter
test.sources.testSource.interceptors.head_filter.type = regex_filter
test.sources.testSource.interceptors.head_filter.regex = ^user,event,invited,timestamp$
test.sources.testSource.interceptors.head_filter.excludeEvents = true
test.sources.testSource.channels = testChannel# Define / Configure sink
test.sinks.testSink.type = org.apache.flume.sink.kafka.KafkaSink
test.sinks.testSink.batchSize = 640
test.sinks.testSink.brokerList = sandbox-hdp.hortonworks.com:6667
test.sinks.testSink.topic = test
test.sinks.testSink.channel = testChannel
install -m 777 test.csv /events/input/intra/test/test_2021-01-25.csv

创建目录

mkdir -p /var/flume/checkpoint/user_friends
mkdir -p /var/flume/data/user_friendschmod 777 -R /var/flume

(user_friends agent’s)

# Initialize agent's source, channel and sink
user_friends.sources = user_friendsSource
user_friends.channels = user_friendsChannel
user_friends.sinks = user_friendsSink# Use a channel which buffers events in a directory
user_friends.channels.user_friendsChannel.type = file
user_friends.channels.user_friendsChannel.checkpointDir = /var/flume/checkpoint/user_friends
user_friends.channels.user_friendsChannel.dataDirs = /var/flume/data/user_friends# Setting the source to spool directory where the file exists
user_friends.sources.user_friendsSource.type = spooldir
user_friends.sources.user_friendsSource.deserializer = LINE
user_friends.sources.user_friendsSource.deserializer.maxLineLength = 128000
user_friends.sources.user_friendsSource.spoolDir = /events/input/intra/user_friends
user_friends.sources.user_friendsSource.includePattern = user_friends_[0-9][4]-[0-9][2]-[0-9][2].csv
user_friends.sources.user_friendsSource.interceptors = head_filter
user_friends.sources.user_friendsSource.interceptors.head_filter.type = regex_filter
user_friends.sources.user_friendsSource.interceptors.head_filter.regex = ^user,friends$
user_friends.sources.user_friendsSource.interceptors.head_filter.excludeEvents = true
user_friends.sources.user_friendsSource.channels = user_friendsChannel# Define / Configure sink
user_friends.sinks.user_friendsSink.type = org.apache.flume.sink.kafka.KafkaSink
user_friends.sinks.user_friendsSink.batchSize = 640
user_friends.sinks.user_friendsSink.brokerList = sandbox-hdp.hortonworks.com:6667
user_friends.sinks.user_friendsSink.topic = user_friends_raw
user_friends.sinks.user_friendsSink.channel = user_friendsChannel
install -m 777 user_friends.csv /events/input/intra/user_friends/userFriends_2021-01-26.csv

创建目录

mkdir -p /var/flume/checkpoint/event_attendess
mkdir -p /var/flume/data/event_attendess

修改权限

chmod 777 -R /var/flume

(event_attendess agent’s)

#Initialize agent's source channel and sink event_attendees.sources = eventAttendeesSource
event_attendees.channels = eventAttendeesChannel
event_attendees.sinks = eventAttendeesSink#Use a channel which buffers events in a directoryevent_attendees.channels.eventAttendeesChannel.type = file
event_attendees.channels.eventAttendeesChannel.checkpointDir = /var/flume/checkpoint/event_attendees
event_attendees.channels.eventAttendeesChannel.dataDirs = /var/flume/data/event_attendees#Setting the source to spool directory where the file existsevent_attendees.sources.eventAttendeesSource.type = spooldir
event_attendees.sources.eventAttendeesSource.deserializer = LINE
event_attendees.sources.eventAttendeesSource.deserializer.maxLineLength = 12800
event_attendees.sources.eventAttendeesSource.spoolDir = /events/input/intra/event_attendees
event_attendees.sources.eventAttendeesSource.includePattern = eventAttendees_[0-9][4]-[0-9][2]-[0-9][2].csv
event_attendees.sources.eventAttendeesSource.interceptors = head_filter
event_attendees.sources.eventAttendeesSource.interceptors.head_filter.type = regex_filter
event_attendees.sources.eventAttendeesSource.interceptors.head_filter.regex = ^event,yes,maybe,invited,no$
event_attendees.sources.eventAttendeesSource.interceptors.head_filter.excludeEvents = true
event_attendees.sources.eventAttendeesSource.channels = eventAttendeesChannel#Define /Congigure sinkevent_attendees.sinks.eventAttendeesSink.type = org.apache.flume.sink.kafka.KafkaSink
event_attendees.sinks.eventAttendeesSink.batchSize = 640
event_attendees.sinks.eventAttendeesSink.brokerList = sandbox-hdp.hortonworks.com:6667
event_attendees.sinks.eventAttendeesSink.topic = event_attendees_raw
event_attendees.sinks.eventAttendeesSink.channel = eventAttendeesChannel
install -m 777 event_attendees.csv /events/input/intra/event_attendees/eventAttendees_2021-01-26.csv

离线数据分析平台——用户兴趣取向分析(3)导入数据到kafka相关推荐

  1. 超详攻略!Databricks 数据洞察 - 企业级全托管 Spark 大数据分析平台及案例分析

    简介: 5分钟读懂 Databricks 数据洞察 ~ 更多详细信息可登录 Databricks 数据洞察 产品链接:https://www.aliyun.com/product/bigdata/sp ...

  2. seo提交工具_呼伦贝尔网站建设_5118站长数据分析平台非常适合SEO站长做数据统计...

    发布时间:2020-12-07 08:12:27 标准站长需要哪些工具?我相信许多站长没有考虑过这个问题.我一直认为,找到一个数据非常大的工具非常适合我们的站长.几天前,朋友们推荐了一个新的站长工具. ...

  3. BI大数据分析平台,精细化分析的必备工具

    在日常的工作中,经常会遇到要做经营决策时,数据分析却掉链子的情况,比如当老板临时提出要进一步分析某类商品的销售情况时,得重新开发报表.BI大数据分析平台能不能随时随地实现精细化数据分析,避免数据分析跟 ...

  4. 58同城离线计算平台设计与实践(大数据进阶)

    编者荐语: 58离线计算平台基于 Hadoop 生态体系打造,单集群4000+台服务器,数百 PB 存储,日40万计算任务,面临挑战极大.本次分享将聚焦大数据平台离线计算和大家一起系统的探讨58在离线 ...

  5. 医疗平台在线咨询文本分析——爬取数据(一)

    一.分析目的 受这次疫情的影响,互联网医疗必然是未来发展的一个趋势,借用在线医疗平台了解用户在线咨询的情况: 一是用户在线咨询的问诊量变化情况: 二是了解用户哪些疾病选择在线咨询较多.希望获得帮助是什 ...

  6. 第一章 Excel数据分析入门 --(2)Excel导入数据

    Excel数据的来源除了手动录入以外,还可以通过外界数据进行导入,有以下几种外部数据导入方式. 1.通过⽹站导⼊数据 先找到某数据网站,只要有表格数据即可,比如:http://cbadata.spor ...

  7. 离线数据分析平台实战驴妈妈项目实战(完整)

    课程目录         第1篇 大数据基本技能储备         第2篇 项目需求介绍         第3篇 用户行为数据收集模块实现讲解         第4篇 数据分析模块讲解       ...

  8. [逐笔数据分析工具分享]如何分析股票逐笔数据

    工具分享链接:https://pan.baidu.com/s/1fbDoPM2NzSBEn31gDBZnpQ 提取码:v0sm ​1. 配置stocklist.txt和datelist.txt sto ...

  9. 【数据分析案例】深度分析超市零售商店数据--Python数据分析实战

    前言 咳咳,又是好久不见~这不高考已经结束了 对python感兴趣的准大学生们,是打算好好玩几个月还是,继续研究学习python呢~

最新文章

  1. delphi dbgrideh 遍历每一个单元格_利用财务函数制作贷款计算器,让你了解还款的每一个细节...
  2. Eclipse创建的Maven项目报错Could not calculate build plan: Plugin
  3. Nmap用法的思维导图
  4. 文献记录(part52)--基于度相关性的病毒传播模型及其分析
  5. 计算机专业伦理总结,《计算机伦理学》节译实践报告
  6. 打印机怎么扫描到电脑_【柯美C360扫描怎么用教程】打印机怎么扫描
  7. 旋翼机自主着陆-主要技术难点
  8. c++检测固定usb端口有无设备接入_电脑USB技术白皮书
  9. [ZT]C#的多线程机制探索(1)
  10. 现代语音信号处理之语音特征参数估计
  11. 深入分析MFC之GDI原理透析
  12. 如何解决Win10账户没有了管理员权限
  13. 每周分享第 58 期
  14. Xilinx zynqmp USB开发
  15. 对于百度统计热力点击图的研究
  16. Attempted read from closed stream
  17. android studio编译app,Android Studio编译项目报错
  18. 2022高教杯思路 数模思路
  19. 改变xp开机和关机画面的方法
  20. Filament介绍

热门文章

  1. 瑞芯微RK3568四核核心板芯片简介
  2. 百度网盘分析报告:男性偏爱在深夜使用!
  3. 超市计算机主管总结报告,超市工作分析总结报告
  4. Web安全:文件包含漏洞测试(防止 黑客利用此漏洞.)
  5. 多代理区块链框架客户端的操作
  6. 大宽带服务器有什么优势?
  7. Windows环境下编译FreeRDP
  8. 服务器显示E12,三洋滚筒洗衣机故障代码E11、E12的处理方法
  9. 全国程序员收入大调查,粒度到省
  10. 电子货架标签--拣货标签4代------波次管理案例解析