NET中解决KafKa多线程发送多主题的问题
一般在KafKa消费程序中消费可以设置多个主题,那在同一程序中需要向KafKa发送不同主题的消息,如异常需要发到异常主题,正常的发送到正常的主题,这时候就需要实例化多个主题,然后逐个发送。
在NET中用RdKafka组件来做消息处理,在Nuget中引用。
在程序中初始化Producer,并创建多个Topic
private string comtopic = "topic1";
private string errtopic = "topic2";
private string kfkip = "192.168.80.32:9092";
Topic topic = null;
Topic errTopic = null;
public ExcuteFlow()
{
try
{
Producer producer = new Producer(kfkip);
topic = producer.Topic(comtopic);
errTopic = producer.Topic(errtopic);
}
catch (RdKafkaException ex)
{
LogHelper.Error("KafKa初始化KafKa异常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("KafKa初始化异常", ex);
}
}
在程序中发送其中一个主题:
try
{
if (topic != null)
{
byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
Task<DeliveryReport> deliveryReport = topic.Produce(datas);
var unused = deliveryReport.ContinueWith(task =>
{
LogHelper.Info("内容:{flowCommond.ID} 发送到分区:{task.Result.Partition}, Offset 为: {task.Result.Offset}");
});
}
else
{
throw new Exception("发送消息到KafKa topic 为空");
}
}
catch (RdKafkaException ex)
{
LogHelper.Error("发送消息到KafKa KafKa异常", ex);
}
catch (Exception ex)
{
LogHelper.Error("发送消息到KafKa异常", ex);
}
flowCommond为要发送的对象内容,格式化为Json字符串再发送。
另一个主题一样处理。
这里实现一个线程里面发送多个主题,那下面实现多个线程中如何发送多个主题。
多线程中如果每个线程都new Producer(kfkip) 一次,那KafKa的连接很快会被占满。
那这里就用单例模式来解决这个问题,每次要用到Producer时检查一下是否已经存在Producer实例,若存在则直接用不用再生成。
/// <summary>
/// 单例模式的实现
/// </summary>
public class SingleProduct : Producer
{
// 定义一个静态变量来保存类的实例
private static SingleProduct uniqueInstance;
// 定义一个标识确保线程同步
private static readonly object locker = new object();
// 定义私有构造函数,使外界不能创建该类实例
private SingleProduct(string brokerList) : base(brokerList)
{
}
/// <summary>
/// 定义公有方法提供一个全局访问点,同时你也可以定义公有属性来提供全局访问点
/// </summary>
/// <returns></returns>
public static SingleProduct GetInstance()
{
// 当第一个线程运行到这里时,此时会对locker对象 "加锁",
// 当第二个线程运行该方法时,首先检测到locker对象为"加锁"状态,该线程就会挂起等待第一个线程解锁
// lock语句运行完之后(即线程运行完之后)会对该对象"解锁"
if (uniqueInstance == null)
{
lock (locker)
{
// 如果类的实例不存在则创建,否则直接返回
if (uniqueInstance == null)
{
string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];
try
{
uniqueInstance = new SingleProduct(kfkip);
LogHelper.Error("单例模式 实例化 SingleProduct");
}
catch (RdKafkaException ex)
{
LogHelper.Error("单例模式 KafKa初始化KafKa异常 ", ex);
}
catch (Exception ex)
{
LogHelper.Error("单例模式 KafKa初始化异常", ex);
}
}
}
}
return uniqueInstance;
}
}
然后在初始化的代码中替换Producer producer = new Producer(kfkip);为 Producer producer = SingleProduct.GetInstance();
OK!以上就完成了多线程多主题的消息发送。
相关文章:
消息队列 Kafka 的基本知识及 .NET Core 客户端
.net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)
.net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)
原文地址:http://www.cnblogs.com/zhangs1986/p/7285525.html
.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注
NET中解决KafKa多线程发送多主题的问题相关推荐
- kafka 解决大消息发送和接收报错问题
kafka消息超过一定大小会报错如下: The message is 2044510 bytes when serialized which is larger than the maximum re ...
- kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息
本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...
- 【UDP通过多线程改进,在一个窗口中同时接收又发送】
package com.yjf.esupplier.common.test;import java.net.DatagramSocket; import java.net.SocketExceptio ...
- kafka 异步发送阻塞_Kafka学习一
一.github下载kafka的源码 可以看到kafka的源码开源社区是非常活跃的. 二.搭建kafka环境 构建kafka环境,首先需要安装Scala和gradle,再安装的scala插件需要和你的 ...
- Springboot项目中使用Kafka
Springboot项目中使用Kafka 第一步:安装好Kafka服务器 具体可参考: https://blog.csdn.net/weixin_40990818/article/details/10 ...
- 【Kafka笔记】5.Kafka 多线程消费消息
Kafka多线程消费理解 Kafka Java Consumer设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心 ...
- 如何在优雅地Spring 中实现消息的发送和消费
本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个 ...
- go 实现 kafka 消息发送、接收
引言 网络上关于 go 实现 kafka 消息发送和接收的文章很多,但是实际操作起来又不是很清楚,本文在网络资源的基础上,结合自己搭建过程中遇到的问题进行了总结. 本文的实验主机:Mac笔记本. 一. ...
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
最新文章
- 设计模式之代理模式(Proxy)摘录
- 清华北大,已经没人本科就找工作了
- Mac下Android配置及unity3d的导出Android
- 前端利用JS导出数据到Excel表 数字是文本类型 无法计算
- Redis的分布式锁详解
- 使用Maven把项目打包成可执行jar在Idea里
- 如何将安卓数据同步到Mac电脑上
- linux 主机支持远程唤醒_linux 通过wol远程开机【转】
- 《C++(一)--类》
- 人工智能基础——2.3.2产生式系统
- 如何使用计算机来线性拟合,Excel2019使用教程:绘制线性回归图
- 吉他学习笔记--更新中
- Linux的隐匿技巧【渗透测试】
- 把一个内含7个元素的数组中的第3~第5个元素拷贝到内含3个元素的数组中
- POJ_1849 Two
- php mysql 点餐系统_laravel实现点餐系统,快来点餐吧!
- linux usb3.0移动硬盘,希捷(Seagate)1TB USB3.0移动硬盘使用评测
- 宝宝生活点滴(2010.05)
- IT之家,这不是个案
- ALLHiC: 辅助组装简单的二倍体基因组