Flink Kafka Doris实战demo

环境:

  • Flink 1.12
  • Doris 0.12
  • Kafka 1.0.1+kafka3.1.1

一:编译doris

参考官网Docker编译:https://github.com/apache/incubator-doris/wiki/Doris-Install

1.1 注意问题:

需要把fe/pom.xml中下载的Repository地址改下
cloudera-thirdparty
https://repository.cloudera.com/content/repositories/third-party/ 改为:

cloudera-public https://repository.cloudera.com/artifactory/public/

cloudera-plugins
https://repository.cloudera.com/content/groups/public/ 改为:

cloudera-public https://repository.cloudera.com/artifactory/public/

1.2 编译:

直接编译就行

1.3 如果不想编译,直接下载编译好的部署包(0.12,0.11版本)

https://download.csdn.net/download/leng91060404/16661347
https://download.csdn.net/download/leng91060404/16655427

二:部署doris(单机版测试)

参考:https://github.com/apache/incubator-doris/wiki/Doris-Install#3-%E9%83%A8%E7%BD%B2

2.1 注意

FE:

在fe目录下创建doris-meta目录;

根据需要修改priority_networks = ip 参数

BE:
修改参数storage_root_path = /home/disk1/doris;/home/disk2/doris;并创建目录;

根据需要修改priority_networks = ip 参数

2.2 启动

有问题直接查看log日志,进行分析解决

2.3 查看状态

BE节点需要先在FE中添加,才可加入集群。

可以使用 mysql-client 连接到 FE: ./mysql-client -h host-P port -uroot 其中 host 为 FE 所在节点 ip;port 为 fe/conf/fe.conf 中的 query_port;默认使用 root 账户,无密码登录。

FE状态查看:
用户可以通过 mysql 客户端登陆 Master FE。通过:
SHOW PROC '/frontends';
来查看当前 FE 的节点情况。
也可以通过前端页面连接:http://fe_hostname:fe_http_port/frontend 或者 http://fe_hostname:fe_http_port/system?path=//frontends 来查看 FE 节点的情况。
mysql> SHOW PROC '/frontends';
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
| Name                           | IP          | HostName  | EditLogPort | HttpPort | QueryPort | RpcPort | Role     | IsMaster | ClusterId | Join | Alive | ReplayedJournalId | LastHeartbeat       | IsHelper | ErrMsg |
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
| 172.16.3.76_9011_1618481783037 | xxxxxxxxx | xxxxxxxx | 9011        | 8030     | 9030      | 9020    | FOLLOWER | true     | 502881269 | true | true  | 24505             | 2021-04-16 16:03:57 | true     |        |
+--------------------------------+-------------+-----------+-------------+----------+-----------+---------+----------+----------+-----------+------+-------+-------------------+---------------------+----------+--------+
1 row in set (0.02 sec)
BE状态查看:
用户可以通过 mysql-client 登陆 Leader FE。通过:
SHOW PROC '/backends';
来查看当前 BE 的节点情况。
也可以通过前端页面连接:http://fe_hostname:fe_http_port/backend 或者 http://fe_hostname:fe_http_port/system?path=//backends 来查看 BE 节点的情况。
mysql> SHOW PROC '/backends';
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
| BackendId | Cluster         | IP          | HostName  | HeartbeatPort | BePort | HttpPort | BrpcPort | LastStartTime       | LastHeartbeat       | Alive | SystemDecommissioned | ClusterDecommissioned | TabletNum | DataUsedCapacity | AvailCapacity | TotalCapacity | UsedPct | MaxDiskUsedPct | ErrMsg | Version             |
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
| 10002     | default_cluster | xxxxxxxx | xxxxxx | 9050          | 9060   | 8040     | 8060     | 2021-04-15 18:17:38 | 2021-04-16 16:02:57 | true  | false                | false                 | 13        | 55.108 KB        | 64.740 GB     | 165.915 GB    | 60.98 % | 60.98 %        |        | 0.12.0-rc03-Unknown |
+-----------+-----------------+-------------+-----------+---------------+--------+----------+----------+---------------------+---------------------+-------+----------------------+-----------------------+-----------+------------------+---------------+---------------+---------+----------------+--------+---------------------+
1 row in set (0.00 sec)

三:flink程序(source:kafka、transfer:flink、sink:kafka)

3.1 数据实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Item {private int id;private String name;private int counts;private Timestamp ts;@Overridepublic String toString() {return "{" +"\"id\":" + "\"" + id + "\"" +",\"name\":" + "\"" + name + "\"" +",\"counts\":" + "\"" + counts + "\"" +",\"ts\":" + "\"" + ts + "\"" +"}";}

3.2 模拟数据到source-kafka

{while (true) {Item item = generateItem();long startTime = System.currentTimeMillis();if (isAsync) {producer.send(new ProducerRecord(topic, item.toString()), new DemoCallBack(startTime, item.toString()));System.out.println("Sent message: (" + ", " + item.toString() + ")");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {try {producer.send(new ProducerRecord(topic, item)).get();System.out.println("Sent message: (" + ", " + item + ")");} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}}}

数据:

{"id":"75392","name":"TIE4","counts":"77098","ts":"2021-04-16 11:40:48.423"}
{"id":"40904","name":"TIE3","counts":"26613","ts":"2021-04-16 11:40:52.436"}
{"id":"18363","name":"SHOE4","counts":"56329","ts":"2021-04-16 11:40:56.446"}
{"id":"17240","name":"TIE4","counts":"32312","ts":"2021-04-16 11:41:00.461"}

3.3 flink处理运行(source+transfer+sink)

sql:

public static final String KAFKA_SQL_EVEN_CREATE = "CREATE TABLE even (\n" +"  `id` INTEGER,\n" +"  `name` STRING,\n" +"  `counts` INTEGER,\n" +"  `ts` TIMESTAMP(3)," +" WATERMARK FOR ts as ts - INTERVAL '5' SECOND \n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'flink_sql_table_test_datas',\n" +"  'properties.bootstrap.servers' = 'xxxxxxxxxxxxx:9092',\n" +"  'properties.group.id' = 'testGroup',\n" +"  'scan.startup.mode' = 'earliest-offset',\n" +"  'format' = 'json'\n" +")";

sql :

public static final String QUERY_EVEN_AGG_SQL ="SELECT\n"+ "  id as even_id,name as even_name,\n"+ "  COUNT(counts) even_cnt \n"+ "FROM even \n"+ "GROUP BY TUMBLE(ts, INTERVAL '20' SECOND),id,name";

deal :

{// set up execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);//create table optableEnv.executeSql(KAFKA_SQL_EVEN_CREATE);tableEnv.executeSql(KAFKA_SQL_ODD_CREATE);// windows aggregate count opTable resultEven = tableEnv.sqlQuery(QUERY_EVEN_AGG_SQL);resultEven.printSchema();Table resultOdd = tableEnv.sqlQuery(QUERY_ODD_AGG_SQL);resultOdd.printSchema();//sql join opTable joinTable = resultEven.join(resultOdd).where($("even_name").isEqual($("odd_name"))).select($("even_id").as("id"),$("even_name").as("name"),$("even_cnt").as("counts"),$("ts"));joinTable.printSchema();//sink to kafka 1DataStream<ItemSink> sinkResultStream = tableEnv.toAppendStream(joinTable, ItemSink.class);//kafka connectorProperties props = new Properties();props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);props.put("client.id", KafkaProperties.CLIENT_ID);FlinkKafkaProducer<ItemSink> myProducer = new FlinkKafkaProducer<>(KafkaProperties.TOPIC_SINK,new ProducerStringSerializationSchema(KafkaProperties.TOPIC_SINK),props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);sinkResultStream.addSink(myProducer);//runenv.execute("Streaming Window SQL Job");}

sink-kafka数据

41219,HAT9,1,2021-04-16 11:32:08.524
41219,HAT9,1,2021-04-16 11:33:30.846
41219,HAT9,1,2021-04-16 11:40:36.387
41219,HAT9,1,2021-04-14 19:21:46.738

四:kafka load to doris(flink上面已经sink到kafka)

4.1 建库

CREATE DATABASE example_db;

4.2 建表

CREATE TABLE item
(id INTEGER,name VARCHAR(256) DEFAULT '',ts DATETIME,counts BIGINT SUM DEFAULT '0'
)AGGREGATE KEY(id, name, ts)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");

4.3 加载Kafka2Doris任务

CREATE ROUTINE LOAD example_db.task1 ON itemCOLUMNS TERMINATED BY ",",COLUMNS(id, name, counts, ts)PROPERTIES("desired_concurrent_number"="1","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false")FROM KAFKA("kafka_broker_list" = "xxxxxxx:9092","kafka_topic" = "sinkTopic","property.group.id" = "1234","property.client.id" = "12345","kafka_partitions" = "0,1,2","kafka_offsets" = "0,0,0");

4.4 查看任务

SHOW ROUTINE LOAD FOR example_db.task1;
mysql> SHOW ROUTINE LOAD FOR example_db.task1;
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
| Id    | Name  | CreateTime          | PauseTime | EndTime | DbName                         | TableName | State   | DataSourceType | CurrentTaskNum | JobProperties                                                                                                                                                                                                                         | DataSourceProperties                                                                                    | CustomProperties                        | Statistic                                                                                                                                                                                                     | Progress                           | ReasonOfStateChanged | ErrorLogUrls | OtherMsg |
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
| 10037 | task1 | 2021-04-16 11:27:39 | N/A       | N/A     | default_cluster:flink_kafka_db | item      | RUNNING | KAFKA          | 1              | {"partitions":"*","columnToColumnExpr":"id,name,counts,ts","maxBatchIntervalS":"20","whereExpr":"*","maxBatchSizeBytes":"209715200","columnSeparator":"','","maxErrorNum":"0","currentTaskConcurrentNum":"1","maxBatchRows":"300000"} | {"topic":"flink_sql_table_test_sink","currentKafkaPartitions":"0,1,2","brokerList":"172.16.2.148:9092"} | {"group.id":"1234","client.id":"12345"} | {"receivedBytes":726452,"errorRows":0,"committedTaskNum":159,"loadedRows":20118,"loadRowsRate":0,"abortedTaskNum":629,"totalRows":20118,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":3247070} | {"0":"6059","1":"6915","2":"7141"} |                      |              |          |
+-------+-------+---------------------+-----------+---------+--------------------------------+-----------+---------+----------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+-----------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------+----------------------+--------------+----------+
1 row in set (0.00 sec)
SHOW ROUTINE LOAD TASK WHERE JobName = "task1";
mysql> SHOW ROUTINE LOAD TASK WHERE JobName = "task1";
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+
| TaskId                            | TxnId | TxnStatus | JobId | CreateTime          | ExecuteStartTime    | Timeout | BeId  | DataSourceProperties         |
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+
| 56543bac022d4fdb-bb5944617d6480ac | 788   | UNKNOWN   | 10037 | 2021-04-16 15:56:50 | 2021-04-16 15:56:50 | 40      | 10002 | {"0":6060,"1":6916,"2":7142} |
+-----------------------------------+-------+-----------+-------+---------------------+---------------------+---------+-------+------------------------------+

4.5 通过mysql查看数据

./mysql-client -h host -P port -uroot
mysql> select * from item order by id desc limit 10;
+-------+-------+---------------------+--------+
| id    | name  | ts                  | counts |
+-------+-------+---------------------+--------+
| 99995 | SHOE2 | 2021-04-16 11:34:45 |      3 |
| 99995 | SHOE2 | 2021-04-15 19:05:59 |      4 |
| 99995 | SHOE2 | 2021-04-16 11:35:59 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:35:05 |      3 |
| 99995 | SHOE2 | 2021-04-14 19:22:06 |      4 |
| 99995 | SHOE2 | 2021-04-16 11:35:57 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:38:45 |      3 |
| 99995 | SHOE2 | 2021-04-16 11:38:11 |      3 |
| 99995 | SHOE2 | 2021-04-14 19:22:24 |      4 |
| 99995 | SHOE2 | 2021-04-14 19:21:12 |      4 |
+-------+-------+---------------------+--------+
10 rows in set (0.01 sec)

五:demo代码参考

https://gitee.com/suntyu_admin/flink-kafka-doris

Flink Kafka Doris实战demo相关推荐

  1. 【Flink实战系列】Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/

    java.lang.AbstractMethodError: Method flink/stream/deserialization/PoJoDeserializationSchema.deseria ...

  2. 网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...

  3. flink 写kafka_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景 Flink + Kafka 平台化设计 Kafka 在 ...

  4. 吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

    长文预警, 全文两万五千多字, 37页word文档的长度 (略有杂乱,有些非常复杂的地方可能需要更多的例子来说明,使得初学者也能很容易看懂,但是实在花的时间已经太多太多了,留待后边利用起碎片时间一点点 ...

  5. flink source 同步_网易云音乐基于 Flink + Kafka 的实时数仓建设实践

    简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍 Flink + Kafka 在网易云音乐的应用实战: 背景.Flink + Kafka 平台化设计.Kafka 在 ...

  6. Flink处理函数实战之二:ProcessFunction类

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos Flink处理函数实战系列链接 深入了解Process ...

  7. Elasticsearch入门(包含整合SpringBoot和简单实战demo)

    ElasticSearch 前言:本文的ElasticSearch版本是7.6.x 一.ElasticSearch概述 1.Lucene 在学习ElasticSearch之前,先简单了解一下Lucen ...

  8. Flink的sink实战之一:初探,2020-2021蚂蚁金服Java面试真题解析

    关于<Flink的sink实战>系列文章 本文是<Flink的sink实战>的第一篇,旨在初步了解sink,通过对基本API和addSink方法的分析研究,为后续的编码实战打好 ...

  9. Apache Flink 零基础入门(二十)Flink kafka connector

    内置source和sink 内置source包括从文件读取,从文件夹读取,从socket中读取.从集合或者迭代器中读取.内置的sink包括写文件.控制台输出.socket 内置connectors A ...

最新文章

  1. Hadoop的HA机制原理解析,与HA高可用集群环境搭建
  2. Hash函数及其应用
  3. emq认证mysql后如何使用_EMQ 认证设置和acl访问控制
  4. Java VisualVM 插件地址,安装Visual VM插件,修改下载插件地址使插件可以直接在JVisualVM中进行下载
  5. 转:怎样在VMware ESXi上 克隆虚拟机
  6. 2018数据可视分析挑战赛-二等奖作品及历年优秀作品
  7. tomcat常见配置及优化、升级
  8. playbook管理配置文件
  9. 决策树分箱-特征工程之数据离散化处理-基于python实现
  10. 在虚拟机上安装使用LoadRunner教程
  11. 企业微信文件会过期吗?
  12. 最适合菜鸟的汉诺塔讲解
  13. Win系统 - 该死!WebGL 遇到了问题 (chrome浏览器)
  14. 百度快照劫持代码的更新
  15. iOS:编译问题Presenting view controllers on detached view controllers is discouraged
  16. API中endpoints是什么意思?
  17. 转载:一位顶级黑客编写的最强反编译器
  18. Android触摸事件实现笔触画布
  19. 台式计算机没有声音图标,电脑没有声音怎么回事 电脑没有声音怎么恢复 电脑声音图标不见了...
  20. 【微服务】微服务熔断器--Hystrix

热门文章

  1. GIS可视化—热力图制作与热点分析(一)
  2. Mysql---基础查询进阶、流程控制语句
  3. linux串口读取mpu9250数据,模拟 I2C 读取 MPU9250 数据的测试笔记
  4. 项目一 国家电力项目思路总结
  5. RuoYi-Flowable 工作流管理平台
  6. DBA 职业迷茫何去何从
  7. sudo修改文件夹名字_linux
  8. [动态规划] 什么是动态规划
  9. 深入解析String intern
  10. java中String优化之intern()方法