一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送。

  在NET中用RdKafka组件来做消息处理,在Nuget中引用。

  在程序中初始化Producer,并创建多个Topic

private string comtopic = "topic1";

private string errtopic = "topic2";

private string kfkip = "192.168.80.32:9092";

Topic topic = null;

Topic errTopic = null;

public ExcuteFlow()

{

try

{

Producer producer = new Producer(kfkip);

topic = producer.Topic(comtopic);

errTopic = producer.Topic(errtopic);

}

catch (RdKafkaException ex)

{

LogHelper.Error("KafKa初始化KafKa异常 ", ex);

}

catch (Exception ex)

{

LogHelper.Error("KafKa初始化异常", ex);

}

}

在程序中发送其中一个主题:

try

{

if (topic != null)

{

byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));

Task<DeliveryReport> deliveryReport = topic.Produce(datas);

var unused = deliveryReport.ContinueWith(task =>

{

LogHelper.Info("内容:{flowCommond.ID} 发送到分区:{task.Result.Partition}, Offset 为: {task.Result.Offset}");

});

}

else

{

throw new Exception("发送消息到KafKa topic 为空");

}

}

catch (RdKafkaException ex)

{

LogHelper.Error("发送消息到KafKa  KafKa异常", ex);

}

catch (Exception ex)

{

LogHelper.Error("发送消息到KafKa异常", ex);

}

 flowCommond为要发送的对象内容,格式化为Json字符串再发送。

  另一个主题一样处理。

  这里实现一个线程里面发送多个主题,那下面实现多个线程中如何发送多个主题。

  多线程中如果每个线程都new Producer(kfkip) 一次,那KafKa的连接很快会被占满。

  那这里就用单例模式来解决这个问题,每次要用到Producer时检查一下是否已经存在Producer实例,若存在则直接用不用再生成。

/// <summary>

/// 单例模式的实现

/// </summary>

public class SingleProduct : Producer

{

// 定义一个静态变量来保存类的实例

private static SingleProduct uniqueInstance;

// 定义一个标识确保线程同步

private static readonly object locker = new object();

// 定义私有构造函数,使外界不能创建该类实例

private SingleProduct(string brokerList) : base(brokerList)

{

}

/// <summary>

/// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点

/// </summary>

/// <returns></returns>

public static SingleProduct GetInstance()

{

// 当第一个线程运行到这里时,此时会对locker对象 "加锁",

// 当第二个线程运行该方法时,首先检测到locker对象为"加锁"状态,该线程就会挂起等待第一个线程解锁

// lock语句运行完之后(即线程运行完之后)会对该对象"解锁"

if (uniqueInstance == null)

{

lock (locker)

{

// 如果类的实例不存在则创建,否则直接返回

if (uniqueInstance == null)

{

string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];

try

{

uniqueInstance = new SingleProduct(kfkip);

LogHelper.Error("单例模式 实例化 SingleProduct");

}

catch (RdKafkaException ex)

{

LogHelper.Error("单例模式 KafKa初始化KafKa异常 ", ex);

}

catch (Exception ex)

{

LogHelper.Error("单例模式 KafKa初始化异常", ex);

}

}

}

}

return uniqueInstance;

}

}

然后在初始化的代码中替换Producer producer = new Producer(kfkip);为 Producer producer = SingleProduct.GetInstance();

  OK!以上就完成了多线程多主题的消息发送。

相关文章:

  • 消息队列 Kafka 的基本知识及 .NET Core 客户端

  • .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)

  • .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

原文地址:http://www.cnblogs.com/zhangs1986/p/7285525.html


.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

NET中解决KafKa多线程发送多主题的问题相关推荐

  1. kafka 解决大消息发送和接收报错问题

    kafka消息超过一定大小会报错如下: The message is 2044510 bytes when serialized which is larger than the maximum re ...

  2. kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息

    本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...

  3. 【UDP通过多线程改进,在一个窗口中同时接收又发送】

    package com.yjf.esupplier.common.test;import java.net.DatagramSocket; import java.net.SocketExceptio ...

  4. kafka 异步发送阻塞_Kafka学习一

    一.github下载kafka的源码 可以看到kafka的源码开源社区是非常活跃的. 二.搭建kafka环境 构建kafka环境,首先需要安装Scala和gradle,再安装的scala插件需要和你的 ...

  5. Springboot项目中使用Kafka

    Springboot项目中使用Kafka 第一步:安装好Kafka服务器 具体可参考: https://blog.csdn.net/weixin_40990818/article/details/10 ...

  6. 【Kafka笔记】5.Kafka 多线程消费消息

    Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...

  7. 如何在优雅地Spring 中实现消息的发送和消费

    本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...

  8. go 实现 kafka 消息发送、接收

    引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...

  9. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

最新文章

  1. 设计模式之代理模式(Proxy)摘录
  2. 清华北大,已经没人本科就找工作了
  3. Mac下Android配置及unity3d的导出Android
  4. 前端利用JS导出数据到Excel表 数字是文本类型 无法计算
  5. Redis的分布式锁详解
  6. 使用Maven把项目打包成可执行jar在Idea里
  7. 如何将安卓数据同步到Mac电脑上
  8. linux 主机支持远程唤醒_linux 通过wol远程开机【转】
  9. 《C++(一)--类》
  10. 人工智能基础——2.3.2产生式系统
  11. 如何使用计算机来线性拟合,Excel2019使用教程:绘制线性回归图
  12. 吉他学习笔记--更新中
  13. Linux的隐匿技巧【渗透测试】
  14. 把一个内含7个元素的数组中的第3~第5个元素拷贝到内含3个元素的数组中
  15. POJ_1849 Two
  16. php mysql 点餐系统_laravel实现点餐系统,快来点餐吧!
  17. linux usb3.0移动硬盘,希捷(Seagate)1TB USB3.0移动硬盘使用评测
  18. 宝宝生活点滴(2010.05)
  19. IT之家,这不是个案
  20. ALLHiC: 辅助组装简单的二倍体基因组

热门文章

  1. 一个脚本实现全量增量备份,并推送到远端备份中心服务器
  2. Spring Boot 入门小目标 3 --- 先来试着热部署
  3. java之for循环
  4. c#使用PdfiumViewer展示、打印pdf文档
  5. 单元测试 | 如何Mock IHttpClientFactory
  6. 关于Asp.net core配置信息读取的源码分析梳理
  7. 用BenchmarkDotNet看Property
  8. 通过Dapr实现一个简单的基于.net的微服务电商系统
  9. 怎样使用C# 获取WIFI的连接状态?
  10. 收好这张MySQL导图,全是知识点!