在前面的“Kafka配置1~Kafka配置6”文章中,我们详细的介绍了Kafka的安装、集群、SASL、SSL和账户权限的配置。该篇文章主要介绍使用C#向Kafka中生产和消费消息。

在C#中,我们使用Confluent.Kafka库作为连接Kafka服务的“桥梁”,可自行在NuGet中下载,这里我们只做在C#中使用SASL+SSL的使用例子,更详细的使用方法可参考官网https://github.com/confluentinc/confluent-kafka-dotnet/。

1、使用VS创建一个解决方案,名称如Quber.Test.Kafka,然后分别创建生产者项目(Quber.Test.KafkaProducer,Windows应用程序)和消费者项目(Quber.Test.KafkaConsumer,控制台应用程序)用于测试,项目结构如下所示:

2、生产者

在Quber.Test.KafkaProducer项目中新建一个生产窗体FrmMain,窗体中增加一个输入框和发送按钮,然后在发送按钮事件中实现生产消息的逻辑,代码如下所示:

/// <summary>
/// 生产消息事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnSend_Click(object sender, EventArgs e)
{var msg = txtMain.Text.Trim();if (string.IsNullOrWhiteSpace(msg)){MessageBox.Show("发送内容不能为空!", "提示信息");txtMain.Focus();return;}//生产的主题var topicName = "TestTopic1";var config = new ProducerConfig{BootstrapServers = "192.168.3.200:9092,192.168.3.200:9093,192.168.3.200:9094",SecurityProtocol = SecurityProtocol.SaslSsl,SaslMechanism = SaslMechanism.Plain,SaslUsername = "quber2",                                   //SASL账户SaslPassword = "quber123456",                              //SASL密码     SslCaLocation = @"D:\Net_Program\Net_Kafka\04_SSL\ca-cert" //SSL证书};using (var p = new ProducerBuilder<Null, string>(config).Build()){try{//异步生产消息var dr = await p.ProduceAsync(topicName, new Message<Null, string> { Value = msg });}catch (ProduceException<Null, string> pe){Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine(pe.Error.Reason);}}
}

说明:上述代码中的quber2账户就是前面文章中提到的具有“写”权限的账户

3、消费者

在Quber.Test.KafkaConsumer项目的Main函数中实现如下消费信息的代码逻辑即可:

static void Main(string[] args)
{//消费的主题var topicName = "TestTopic1";var conf = new ConsumerConfig{GroupId = "TestGroup2",                                   //消费的分组BootstrapServers = "192.168.3.200:9092,192.168.3.200:9093,192.168.3.200:9094",SecurityProtocol = SecurityProtocol.SaslSsl,SaslMechanism = SaslMechanism.Plain,AutoOffsetReset = AutoOffsetReset.Earliest,EnableAutoCommit = false,SaslUsername = "quber1",                                   //SASL账户SaslPassword = "quber123456",                              //SASL密码     SslCaLocation = @"D:\Net_Program\Net_Kafka\04_SSL\ca-cert" //SSL证书};using (var c = new ConsumerBuilder<Ignore, string>(conf).SetErrorHandler((_, e) =>{Console.ForegroundColor = ConsoleColor.Yellow;Console.WriteLine("连接出错:" + e.Reason);}).Build()){c.Subscribe(topicName);CancellationTokenSource cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cts.Cancel();};try{while (true){try{var cr = c.Consume(cts.Token);Console.ForegroundColor = ConsoleColor.Green;Console.WriteLine("收取成功:" + cr.Value);}catch (ConsumeException e){Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine("收取失败:" + e.Error.Reason);}}}catch (OperationCanceledException e){Console.ForegroundColor = ConsoleColor.Red;Console.WriteLine("收取失败:" + e.Message);c.Close();}}
}

说明:上述代码中的quber1账户就是前面文章中提到的具有“读”权限的账户

4、测试

在之前介绍的配置文章中,我们配置了3个账号(quber[读和写]、quber1[读]、quber2[写]),为了方便测试,后续我们又增加了1个账号(quber3[读]),这样就相当于有3个账号具有消费信息的权限(quber、quber1和quber3),然后我们分别设置这3个具有读权限的账户去消费消息,并编译消费者项目,将编译好的文件分别复制到3个不同的文件夹中,然后双击打开文件夹中的Quber.Test.KafkaConsumer.exe文件,此时运行生产者客户端,发送几条测试的数据,具体演示效果如下图所示:

说明:

上述代码中的SslCaLocation = @"D:\Net_Program\Net_Kafka\04_SSL\ca-cert"就是前面文章Kafka配置4--Windows下配置Kafka的SSL证书中提到的证书生成的地址。

Kafka开发--C#生产和消费消息相关推荐

  1. kafka设置起止时间消费消息

    消费kafka消息时,有时可能需要消费某个时间段的消息,写个demo记录下: public class KafkaConsumerByTime {public static void main(Str ...

  2. 利用Kafka发送/消费消息-Java示例

    利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...

  3. Kafka2.12安装与配置/生产与消费

    Kafka2.12安装与配置/生产与消费 一.Kafka安装与配置 1.1 Java环境为前提 jdk下载地址链接:jdk1.8 提取码:9plz zookeeper下载地址链接:zookeeper3 ...

  4. Kafka实现消息生产和消费

    文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...

  5. kafka console 生产消费消息

    producer 生产消息 consumer 消费消息 consumer 消费消息  --from-beginning (生产消息时未启动)

  6. kafka java_Kafka 使用Java实现数据的生产和消费demo

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...

  7. 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  8. KAFKA 最新版 Shell API单机生产与消费

    文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...

  9. Kafka消费消息自动提交与手动提交

    消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了) 消费者建立了与broker之间的⻓连接,开始poll消息. 默认一次poll 500条消息 props.pu ...

  10. RocketMQ事务消息从生产到消费原理详解(包括回查过程)

    名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...

最新文章

  1. 一个能从别人的观念来看事情,能了解别人心灵活动的人,永远不必为自己的前途担...
  2. 面向 Photoshop 的英特尔® Texture Works 插件
  3. HDU-2079 选课时间(题目已修改,注意读题) -母函数
  4. windows远程桌面_如何使用Windows远程桌面连接Ubuntu 干货
  5. ASP.NET MVC5总结(二)@HTML扩展
  6. 【zabbix系列】报警系统的设置和排除
  7. NSCalendar日历
  8. 软件需求说明书/ 概要设计说明书/项目开发计划/详细设计说明书(说明要点及要点解释)
  9. 物联网行业应用前景分析
  10. 环境保护设施运营组织服务认证 认证专业分类及运营设施范围
  11. 系统思考:智猪博弈(变革)
  12. 空间相册显示服务器错误,空间相册服务器繁忙
  13. Hough变换直线检测
  14. 小程序识别企业微信二维码功能
  15. (一)框架搭建,前端路由设置,自定义寻找指定路径(Django+Vue+Mysql,数据库管理数据分析网站)
  16. 前端设计稿转代码现状,会不会失业?
  17. ThinkPad安装Mac
  18. 网络安全工程师待遇 网络安全工程师需要学什么
  19. 以太网、令牌环、FDDI、ATM、WLAN
  20. sql求中位数、四分位数

热门文章

  1. jquery判断页面标签是否存在
  2. Hosting WCF in SharePoint 2007 (Part 1) 基本部署(转)
  3. 【Prufer Sequence +简单排列组合】bzoj 1005: [HNOI2008]明明的烦恼
  4. python2 与 python3的区别
  5. 尝试造了个工具类库,名为 Diana
  6. BZOJ - 2783 树
  7. 一步步完成FastDFS + Spring MVC上传下载整合示例
  8. C#编程总结(十一)数字证书
  9. AutoCAD VBA创建椭圆和样条曲线
  10. Eclipse里做JBPM工作流gpd.xml中文乱码问题解决