标题: ‘说说Flink的连接器connector有哪些,怎么用?’
日期: 2021-07-31 10:26:51
标签: [flink,connector]
分类: 数据仓库

flink作为一个计算引擎,是缺少存储介质的,那么数据从哪儿来,到哪儿去,就需要连接器了,链接各种类型数据库,各种类型组件进行数据的抽取、计算、存储等,下面来看看flink都有哪些connector,怎么使用的?

介绍

看看目前支持的connector:
这是官方给出的:

有些支持数据源,有些不支持数据源,有些支持无边界流式处理,有些不支持,具体看上图。

我们目前市面上用的比较多的数据库,大概是以下几种:

# 支持jdbc
mysql mongodb postgresql oracle db2 sybase sqlserver hive
# 不支持jdbc
hbase es 文件 消息队列(kafka rabbitmq rocketmq)

使用

kafka

CREATE TABLE MyUserTable (-- declare the schema of the table`user` BIGINT,`message` STRING
) WITH (-- declare the external system to connect to'connector' = 'kafka','topic' = 'topic_name','scan.startup.mode' = 'earliest-offset', -- 还有可选从最近offset开始消费:latest-offset'properties.bootstrap.servers' = 'localhost:9092', --kafka broker连接串'format' = 'json'   -- declare a format for this system
)

hbase

注意hbase目前只支持1.4和2.2版本

-- register the HBase table 'mytable' in Flink SQL
CREATE TABLE hTable (rowkey INT,family1 ROW<q1 INT>,family2 ROW<q2 STRING, q3 BIGINT>,family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,PRIMARY KEY (rowkey) NOT ENFORCED
) WITH ('connector' = 'hbase-1.4','table-name' = 'mytable','zookeeper.quorum' = 'localhost:2181'
);-- use ROW(...) construction function construct column families and write data into the HBase table.
-- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO hTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;

jdbc

jdbc连接需要添加对应的driver到flink lib里
mysql:点这里
postgresql:点这里
oracle:点这里下载ojdbc8.jar
这是常用的,其他的在网上都能搜得到

-- register a MySQL table 'users' in Flink SQL
CREATE TABLE MyUserTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'users','driver' = 'com.jdbc.mysql.Driver','username' = 'xxx','password' = 'xxx'
);

es

es只能做sink不能做source

CREATE TABLE myUserTable (user_id STRING,user_name STRINGuv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'users'
);

文件

可以是服务器本地文件,也可以是hdfs文件,区别就是文件路径描述符的区别:

CREATE TABLE MyUserTable (column_name1 INT,column_name2 STRING,...part_name1 INT,part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH ('connector' = 'filesystem',           -- required: specify the connector'path' = 'file:///path/to/whatever',  -- required: path to a directory'path' = 'hdfs:///path/to/whatever',  -- required: path to a directory'format' = '...',                     -- required: file system connector requires to specify a format,-- Please refer to Table Formats-- section for more details'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition-- column value is null/empty string-- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly-- reduce the number of file for filesystem sink but may lead data skew, the default value is false.'sink.shuffle-by-partition.enable' = '...',...
)

另外还有几种特殊的connector:

datagen

datagen会按照字段指定的类型,随机生成对应的数据

CREATE TABLE Orders (order_number BIGINT,price        DECIMAL(32,2),buyer        ROW<first_name STRING, last_name STRING>,order_time   TIMESTAMP(3)
) WITH ('connector' = 'datagen'
)

print

每一个写入该表的数据,都会标准输出到日志里

CREATE TABLE print_table (f0 INT,f1 INT,f2 STRING,f3 DOUBLE
) WITH ('connector' = 'print'
);

blackhole

这个connector会吞噬一切数据,往这个表里写的数据都会消失,主要用于测试性能。

CREATE TABLE blackhole_table (f0 INT,f1 INT,f2 STRING,f3 DOUBLE
) WITH ('connector' = 'blackhole'
);

参考官网链接:
flink connectors

其实每个connector都支持指定类型的format格式方式,下期文章介绍如何指定格式化,可以指定那些格式化。
点个关注呗。


不积跬步无以至千里,不积小流无以成江海。

欢迎关注我的微信公众号,比较喜欢分享知识,也喜欢宠物,所以做了这2个公众号:

喜欢宠物的朋友可以关注:【电巴克宠物Pets】

想知道狗狗怕蚊子吗?扫二维码查看,有惊喜。

一起学习,一起进步。

说说Flink的连接器connector有哪些,怎么用?相关推荐

  1. Flink SQL自定义connector

    本文翻译自:Flink v1.11官方文档 动态表是Flink的Table&SQL API的核心概念,用于以统一方式处理有界和无界数据. 因为动态表只是一个逻辑概念,所以Flink并不拥有数据 ...

  2. Flink: FlieSystem SQL Connector

    Flink: FlieSystem SQL Connector Flink:1.13 基于https://nightlies.apache.org/flink/flink-docs-release-1 ...

  3. Flink FileSystem的connector分析

    文章目录 前言 FileSystem的RollingSink FileSystem连接器的Exactly Once恢复语义 FileSystem Sink的文件状态转换 Checkpoint下的Exa ...

  4. [转]Loadrunner Error code 10053 Tomcat 连接器(connector)优化

    LoadRunner提示错误:Error : socket0 - Software caused connection abort. Error code : 10053. 在今天的测试过程中发现,s ...

  5. 企业搜索引擎开发之连接器connector(二十七)

    ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法 * A source of {@link Change} objects.** @since 2.8* ...

  6. 企业搜索引擎开发之连接器connector(二十九)

    在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resum ...

  7. 33、连接器(connector)

    描述 camunda工作流设计的与外部信息沟通的方法有外部任务(external task),业务任务(service task),执行监听器(execution listener), 以上都是当外部 ...

  8. Flink CDC 系列(3)—— Flink CDC MySQL Connector 与 Flink SQL 的结合使用案例Demo

    Flink CDC 系列文章: <Flink CDC 系列(1)-- 什么是 Flink CDC> <Flink CDC 系列(2)-- Flink CDC 源码编译> < ...

  9. 1.4 Flink HDFS Connector /Flink HDFS连接器

    在上一章节已经翻译了Flink Kafka Connector,但由于HDFS作为大多数研究大数据者日常用到的,此章节并添加翻译HDFS的连接器. 此连接器提供了一个Sink,将分区文件写入Hadoo ...

最新文章

  1. 软件工程技术基础-(软件复用技术)
  2. Redis 安装详细过程(redis基本使用(服务端和客户端)、修改密码)
  3. 如何管理项目中外包开发人员、测试人员
  4. jQuery JavaScript库达到新的里程碑
  5. android输出log,Android开发 Release情况下也能输出log
  6. 四年后十大最赚钱行业
  7. 构造全自动计算的CPU
  8. 谷歌利用OKR系统考核 脑力行业或可借鉴
  9. 【撸啊撸 Docker】搭建 Jenkins
  10. centos 对已有卷扩容_centos7下对原有磁盘分区进行在线扩容
  11. 总结:8.9 模拟(枚举搜索)
  12. 如何:在Spring中使用@Conditional和Condition注册组件
  13. 一个程序员如何做到结构上胸有成竹
  14. 服务器不删档的设置_不删档预捏脸开启,快来体验次世代黑科技!
  15. mod_rewrite
  16. 常见的问题:https://localhost:1158/em 无法打开
  17. Flex显示图片的常用方式
  18. ORACLE11G在LINUX6下安装及报错 C [ld-linux-x86-64.so.2+0x14d70]的解决方法
  19. 工厂模式在 Calendar 类中的应用
  20. tomcat多实例的端口设置

热门文章

  1. javaweb网上宠物商城管理系统分前后台(源码+数据库+开题报告+ppt+文档)
  2. 反调试技巧总结-原理和实现
  3. python 桑基图 地理坐标_利用Python+Excel制作桑基(Sankey)图
  4. 一篇文让你秒懂CDN
  5. 背景与小鸟--FlappyBird游戏开发教程之二
  6. 狗是好狗,正不正经就不知道了
  7. JS方式实现隐藏手机号码中间4位数
  8. 编写Java程序,从键盘输入一个正整数n(n≥2),随机生成一个n阶方阵,每个元素均为整数,其取值范围为[100,999]。然后输出该方阵元素中的最大值和最小值。
  9. 当前时间显示器(代码屏显)
  10. 一维数组中重复元素的去除