将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中

{"id":1,"name":"小李"}
{"id":2,"name":"小张"}
{"id":3,"name":"小刘"}
{"id":4,"name":"小王"}

1、在hive/conf/hive-site.xml中添加或修改如下内容:

    <property><name>hive.txn.manager</name><value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value></property><property><name>hive.support.concurrency</name><value>true</value></property><property><name>hive.metastore.uris</name><value>thrift://localhost:9083</value></property>

2、创建database、table,其中表有id、name这个两个字段

hive> create database hivetokafka;hive> create table kafkatable(id int,name string)
hive> clustered by(id) into 2 buckets stored as orc tblproperties('transactional'='true');

3、执行 hive --service metastore & 启动元数据服务

 hive --service metastore &

4、配置conf文件,这里文件名和位置可以随意(我的是放在hive/myconf/新建的目录下,名字为kafkatohive.conf),添加如下内容

a.sources=source_from_kafka
a.channels=mem_channel
a.sinks=hive_sink#kafka为souce的配置
a.sources.source_from_kafka.type=org.apache.flume.source.kafka.KafkaSource
a.sources.source_from_kafka.zookeeperConnect=localhost:2181
a.sources.source_from_kafka.bootstrap.servers=localhost:9092
a.sources.source_from_kafka.topic=testtopic
a.sources.source_from_kafka.channels=mem_channel
a.sources.source_from_kafka.consumer.timeout.ms=1000
#hive为sink的配置
a.sinks.hive_sink.type=hive
a.sinks.hive_sink.hive.metastore=thrift://localhost:9083
a.sinks.hive_sink.hive.database=hivetokafka
a.sinks.hive_sink.hive.table=kafkatable
a.sinks.hive_sink.hive.txnsPerBatchAsk=2
a.sinks.hive_sink.batchSize=10
a.sinks.hive_sink.serializer=JSON
a.sinks.hive_sink.serializer.fieldnames=id,name
#channel的配置
a.channels.mem_channel.type=memory
a.channels.mem_channel.capacity=1500
a.channels.mem_channel.transactionCapacity=1000
#三者之间的关系
a.sources.source_from_kafka.channels=mem_channel
a.sinks.hive_sink.channel=mem_channel

5、将/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-x.x.x.jar拷贝到/flume/lib/下

此外还需要注意/hive/lib/guava-xx.x-jre.jar下与/flume/lib/下的版本是否一致。

6、启动flume,命令格式如下

flume-ng agent --conf conf/ --conf-file conf/….  --name a -Dflume.root.logger=INFO,console;

我这里就是(在flume/路径下 ):

bin/flume-ng agent --conf myconf/ --conf-file myconf/kafkatohive.conf  --name a -Dflume.root.logger=INFO,console;

7、新建终端窗口,创建topic(默认已经启动了zookeeper和kafka服务了)

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic

8、启动kafka生产者,进行生产消息

启动命令:

kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic

生产消息:

>{"id":1,"name":"小李"}
>{"id":2,"name":"小张"}
>{"id":3,"name":"小刘"}
>{"id":4,"name":"小王"}

9、查看结果

hive> select * from student;
OK
1   小李
2   小张
3   小刘
4   小王Time taken: 0.589 seconds, Fetched: 10 row(s)

通过Flume简单实现Kafka与Hive对接(Json格式)相关推荐

  1. hive 导出json格式 文件_hive中创建hive-json格式的表及查询

    在hive中对于json的数据格式,可以使用get_json_object或json_tuple先解析然后查询. 也可以直接在hive中创建json格式的表结构,这样就可以直接查询,实战如下(hive ...

  2. hive 导出json格式 文件_hive支持json格式的数据

    Hive 支持完全 json 格式的数据  现有 json 格式的数据 test.txt ,如下 {"name":"zhang","age" ...

  3. hive中json格式字段解析及map使用

    json 如果hive表中有扩展字段,那么扩展字段格式最常见的就是json格式,所以如何解析json字符串相对繁琐(虽然没啥技术

  4. Hive解析Json格式用户日志

    解析json格式数据 Json数据 HIVE SQL解析 第一种方式 第二种方式 第三种方式 string类型的数组形式的行转列拆分 Json数据 第一次写博客,mark一下 --20191025 下 ...

  5. hive 导出json格式 文件_hive存储json格式文件

    hive从0.12版本以后就开始自身支持json文件的格式了 1.文件格式 下面是测试用的文件预览格式,我将其存储为jsonTest.json文件,放在了/root目录下 {"_locati ...

  6. hive 导出json格式 文件_Hive 系列 之 基本操作合集

    下面是本课程概览: (1)hive系列之简介,安装,beeline和hiveserver2 (2)hive系列之基本操作 (3)hive系列之udf (4)hive系列之二级分区和动态分区 (5)hi ...

  7. hive 导出json格式 文件_Magicodes.IE在.NET Core中通过请求头导出多种格式文件

    前言 在2.2里程碑中我们增加了一些新的功能,正如标题所写通过请求头进行导出我们不同格式的文件.下面我们来看一下如何使用.通过这种方式无论是对我们的数据多用途,还是说对我们的数据校验都做到了轻松易配. ...

  8. Storm集成HBase、JDBC、Kafka、Hive测试

    /*** storm集成Kafka.Hive.JDBC.HBase.HDFS* Created by sker on 17-11-13* kafka集成storm,将数据发到JobBolt做中文分词逻 ...

  9. Storm集成HBase、JDBC、Kafka、Hive

    代码参上 /*** storm集成Kafka.Hive.JDBC.HBase.HDFS* Created by sker on 17-11-13* kafka集成storm,将数据发到JobBolt做 ...

最新文章

  1. 如何让自己时刻冷静的方法_4个方法,教你如何真正爱自己
  2. 熟悉Linux实验实训,非常详细的Linux操作系统与实训教程实验(三)
  3. python【数据结构与算法】快速幂and矩阵快速幂取模(看不懂你来打我)
  4. 浅谈Logistic回归及过拟合
  5. License for package Android SDK Build-Tools 28.0.2 not accepted.(MAC)
  6. .Net Core开发日志——Global Tools
  7. Windows编程初步(三)【说明:有敏感字眼已全删,不知道为啥还审核不通过】
  8. (2017.9.27) 自定义列表项 list-style 使用心得
  9. 18 Strings for Mac(Xcode文件翻译工具)
  10. double im2double mat2gray之一二说
  11. 图:最新微软框架的现场计分双屏管理系统终于收工,大家请欣赏界面。
  12. 不同vlan之间如何ping通_如何利用交换机实现不同VLAN、不同网段之间互访?
  13. 基于单片机的电子秤系统设计(电路+流程)
  14. 超详细的VSCode下载和安装教程以及解决VSCode下载速度特别慢的问题
  15. InfoGAN原理PyTorch实现Debug记录
  16. 电脑计算机 系统制造方法是,如何制作车载计算机系统
  17. 开拓海外市场,需要选择怎样的云服务?
  18. 计算简史:什么是计算机?《禅与计算机程序设计艺术》 / 陈光剑
  19. 分数排名 mysql_mysql 分数排名
  20. jar启动方式设置内存参数

热门文章

  1. ipcp协议 Linux,Linux命令Man解释:PPPD(8) :点对点daemon协议
  2. qt求一个区间的随机数
  3. linux应用程序逆向,Linux下查看并下载命令源码包(根据命令/应用程序逆向获取并且安装其所属源码包)...
  4. 手机访问服务器中的数据库文件,手机连接服务器数据库文件在哪里
  5. linux下c语言编程gedit,Ubuntu Linux下实现Gedit支持NesC语法高亮
  6. 项目结构_组织结构对项目影响系列课程---项目式组织结构
  7. java long.max_value,Long + Long不大于Long.MAX_VALUE
  8. java teechart怎么用_TeeChart使用范例
  9. maya python 游戏与影视编程指南_《Maya Python游戏与影视编程指南》【价格 目录 书评 正版】_中国图书网...
  10. linux定时任务每两天执行,Linux定时任务 crontab每秒执行 实现2种方法