前言

之前有文章 《从0到1学习Flink》—— Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种,还有 RocketMQ、RabbitMQ 等,刚好 Flink 也支持将数据写入到 RabbitMQ,所以今天我们就来写篇文章讲讲如何将 Flink 处理后的数据写入到 RabbitMQ。

前提准备

安装 RabbitMQ

这里我直接用 docker 命令安装吧,先把 docker 在 mac 上启动起来。

在命令行中执行下面的命令:

docker run -d  -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

对这个命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/

登录用户名和密码分别是:admin / admin ,登录进去是这个样子就代表安装成功了:

依赖

pom.xml 中添加 Flink connector rabbitmq 的依赖如下:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

生产者

这里我们依旧自己写一个工具类一直的往 RabbitMQ 中的某个 queue 中发数据,然后由 Flink 去消费这些数据。

注意按照我的步骤来一步步操作,否则可能会出现一些错误!

RabbitMQProducerUtil.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducerUtil {public final static String QUEUE_NAME = "zhisheng";public static void main(String[] args) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();//设置RabbitMQ相关信息factory.setHost("localhost");factory.setUsername("admin");factory.setPassword("admin");factory.setPort(5672);//创建一个新的连接Connection connection = factory.newConnection();//创建一个通道Channel channel = connection.createChannel();// 声明一个队列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息到队列中String message = "Hello zhisheng";//我们这里演示发送一千条数据for (int i = 0; i < 1000; i++) {channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));System.out.println("Producer Send +'" + message + i);}//关闭通道和连接channel.close();connection.close();}
}

Flink 主程序

import com.zhisheng.common.utils.ExecutionEnvUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** 从 rabbitmq 读取数据*/
public class Main {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;//这些配置建议可以放在配置文件中,然后通过 parameterTool 来获取对应的参数值final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();DataStreamSource<String> zhisheng = env.addSource(new RMQSource<>(connectionConfig,"zhisheng",true,new SimpleStringSchema())).setParallelism(1);zhisheng.print();//如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启
//        env.enableCheckpointing(10000);env.execute("flink learning connectors rabbitmq");}
}

运行 RabbitMQProducerUtil 类,再运行 Main 类!

注意⚠️:

1、RMQConnectionConfig 中设置的用户名和密码要设置成 admin/admin,如果你换成是 guest/guest,其实是在 RabbitMQ 里面是没有这个用户名和密码的,所以就会报这个错误:

nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

不出意外的话应该你运行 RabbitMQProducerUtil 类后,立马两个运行的结果都会出来,速度还是很快的。

2、如果你在 RabbitMQProducerUtil 工具类中把注释的那行代码打开的话:

// 声明一个队列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

就会出现这种错误:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

这是因为你打开那个注释的话,一旦你运行了该类就会创建一个叫做 zhisheng 的 Queue,当你再运行 Main 类中的时候,它又会创建这样一个叫 zhisheng 的 Queue,然后因为已经有同名的 Queue 了,所以就有了冲突,解决方法就是把那行代码注释就好了。

3、该 connector(连接器)中提供了 RMQSource 类去消费 RabbitMQ queue 中的消息和确认 checkpoints 上的消息,它提供了三种不一样的保证:

  • Exactly-once(只消费一次): 前提条件有,1 是要开启 checkpoint,因为只有在 checkpoint 完成后,才会返回确认消息给 RabbitMQ(这时,消息才会在 RabbitMQ 队列中删除);2 是要使用 Correlation ID,在将消息发往 RabbitMQ 时,必须在消息属性中设置 Correlation ID。数据源根据 Correlation ID 把从 checkpoint 恢复的数据进行去重;3 是数据源不能并行,这种限制主要是由于 RabbitMQ 将消息从单个队列分派给多个消费者。
  • At-least-once(至少消费一次): 开启了 checkpoint,但未使用相 Correlation ID 或 数据源是并行的时候,那么就只能保证数据至少消费一次了
  • No guarantees(无法保证): Flink 接收到数据就返回确认消息给 RabbitMQ

Sink 数据到 RabbitMQ

RabbitMQ 除了可以作为数据源,也可以当作下游,Flink 消费数据做了一些处理之后也能把数据发往 RabbitMQ,下面演示下 Flink 消费 Kafka 数据后写入到 RabbitMQ。

public class Main1 {public static void main(String[] args) throws Exception {final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);DataStreamSource<Metrics> data = KafkaConfigUtil.buildSource(env);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setVirtualHost("/").setPort(5672).setUserName("admin").setPassword("admin").build();//注意,换一个新的 queue,否则也会报错data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));env.execute("flink learning connectors rabbitmq");}
}

是不是很简单?但是需要注意的是,要换一个之前不存在的 queue,否则是会报错的。

不出意外的话,你可以看到 RabbitMQ 的监控页面会出现新的一个 queue 出来,如下图:

总结

本文先把 RabbitMQ 作为数据源,写了个 Flink 消费 RabbitMQ 队列里面的数据进行打印出来,然后又写了个 Flink 消费 Kafka 数据后写入到 RabbitMQ 的例子!

本文原创地址是: http://www.54tianzhisheng.cn/2019/01/20/Flink-RabbitMQ-sink/ , 未经允许禁止转载。

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

更多私密资料请加入知识星球!

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客。

本文的项目代码在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-rabbitmq

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

14、《从0到1学习Flink》—— Flink JobManager 高可用性配置

15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍

16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了?

kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ相关推荐

  1. flink 不设置水印_从0到1学习Flink—— Flink parallelism 和 Slot 介绍

    前言 之所以写这个是因为前段时间自己的项目出现过这样的一个问题: 1Caused by: akka.pattern.AskTimeoutException: 2Ask timed out on [Ac ...

  2. 两本电子书 |Flink 最佳学习实践 | 从 0 到 1 学会 Apache Flink

    最近接连几天的加班,每次下班基本是晚上 10 点之后了,越发感觉到自己学习的时间并不是很多.所以,要给自己定一个目标,保证一天 2 个小时的学习时间,过程中不被要其他事情打扰而分心. 根据我的经验和观 ...

  3. Flink 最佳学习实践 | 从 0 到 1 学会 Apache Flink

    精选30+云产品,助力企业轻松上云!>>> 最近接连几天的加班,每次下班基本是晚上 10 点之后了,越发感觉到自己学习的时间并不是很多.所以,要给自己定一个目标,保证一天 2 个小时 ...

  4. 数据分析(六)之pandas学习【Series创建、切片、索引和读取外部数据】

    数据分析学习线路图 为什么要学习pandas? 那么问题来了:numpy已经能够帮助我们处理数据,能够结合matplotlib解决我们数据分析的问题,那么pandas学习的目的在什么地方呢? nump ...

  5. flink DDL读取kafka数据-Scala嵌入DDL形式

    步驟: service firewalld stop(关闭防火墙) 啓動hadoop 離開安全模式 啓動zookeeper與kafka集羣 操作 命令 备注 查看topic $KAFKA/bin/ka ...

  6. 损失能收敛到0么 深度学习_人工智能-Tensorflow进行深度学习的一些损失函数的可视化...

    TensorFlow目前是数值计算的最佳开源库,它使机器学习更快更容易.在这篇文章中,您将学习机器学习中使用的一些损失函数.损失函数在机器学习领域非常重要.它们用作测量模型预测输出Y_out与grou ...

  7. python变量快速学习_如何快速的复习学习过的Python

    在持续学习Python的过程中,我们可能会因为某些因素而在一段时间内没有接触Python.那么我们如何快速的复习一下曾经学过的 python 呢? 语法/变量/表达式 也许我们能找到一个小小例子来快速 ...

  8. 微软python在线学习_微软再推免费在线 Python 教程,面向数据科学和机器学习初学者...

    去年九月,微软曾面向 Python 初学者,推出了一套免费的教程视频.从基本介绍和 VS Code 的配置讲起,循序渐进语法概念等基础内容讲解.目前为止,该系列视频播放量已达到将近 180 万次. 近 ...

  9. 电脑硬件知识学习_编程入门书籍:大学学习计算机基础必读 5 本经典入门书籍,收藏...

    新手学习计算机并非易事,作为一个自学编程的过来人,深知打好计算机基础的重要性,缺少了坚实的计算机的基础,往往你也难以往上走,即使学了再多高大上的技术,也都是在沙台上筑高楼,缺少根基,摇摇欲坠. 学好计 ...

最新文章

  1. 微软研究院开源DialoGPT:你有什么梦想?「让世界充满机器人」
  2. 【AD】破解WindowsServer2008R2 AD域控目录还原模式密码及域管理员账号密码
  3. java读取欧姆龙plc_欧姆龙CJ2M系列PLC与PLC之间的数据相互读取设定
  4. 位操作——整数用位存储
  5. 预警展示样式html,纯css3 Tooltip工具提示样式
  6. mysql iscsi_iscsi共享存储的简单配置和应用
  7. jQuery心得5--jQuery深入了解串讲1
  8. C语言,向函数传递一维数组,计算最高分,平均分,人数(要求输入负值时输入结束,且不能超过40人)
  9. Linux网络配置的基本方法
  10. Mac电脑如何调整鼠标灵敏度
  11. 字体的成本:按字算,微软是100美元
  12. 电脑关机后键盘灯和风扇还在转的解决方案
  13. 软件工程——软件维护
  14. Python学习教程公开课:好玩的Python
  15. Safari无法验证网站身份
  16. 华硕电脑改光驱启动计算机,华硕笔记本win7系统如何设置光驱为第一启动项
  17. 【阿柟碎碎念】暑期集训篇
  18. Matlab实现滤波器,进行ASK、FSK、多音信号的滤波
  19. 正规的打码网站使用方法有哪些
  20. ISIS几个命令的区别

热门文章

  1. mysql jdbc连接 优化_java+mysql连接的优化
  2. CTF(pwn) Fastbin Attack
  3. Python 中的hash
  4. Python教程:Sys 与 Import 模块
  5. python 的filter()函数
  6. Python叠加装饰器,三元表达,生成,调用,递归
  7. Python类中的self到底是干啥的
  8. Python的locals()函数
  9. php mysql建表_mysql建表测试
  10. 如何绘制计算机软件程序流程图?