nodejs链接kafka示例(producer、consumer)
2019独角兽企业重金招聘Python工程师标准>>>
安装node环境:
wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-linux-x64.tar.xz
tar xf node-v6.10.3-linux-x64.tar.xz -C /usr/local
mv /usr/local/node-v6.10.3-linux-x64 /usr/local/node
rm -f node-v6.10.3-linux-x64.tar.xz
echo 'PATH=$PATH:/usr/local/node/bin' >> /etc/profile
source /etc/profilenode -v
npm -vnpm install -g nodemon
定义目录:
mkdir -p /home/node/kafka/test
安装node-kafka库:
cd /home/node/
npm install kafka-node
定义producer.js
var kafka = require('kafka-node'),Producer = kafka.Producer,Client = kafka.Client;var client = new Client('localhost:2181', 'producer-test');var producer = new Producer(client);var topic = 'test01';var payloads = [{ topic: topic, messages: 'this is test message' },
];producer.on('ready', function() {producer.createTopics([topic], function(err, data) {producer.send(payloads, function(err, data) {console.log(err || data);process.exit();})})
})producer.on('error', function(err) {console.log('error', err);
})
定义consumer.js
var async = require('async'),kafka = require('kafka-node'),ConsumerGroup = kafka.ConsumerGroup;var topic = 'test01';var options = {host: 'localhost:2181',groupId: 'group-test',sessionTimeout: 15000,autoCommit: true,
};var c1 = new ConsumerGroup(Object.assign({id: 'c1'}, options), topic);
c1.on('message', onMessage);
c1.on('error', onError);var c2 = new ConsumerGroup(Object.assign({id: 'c2'}, options), topic);
c2.on('message', onMessage);
c2.on('error', onError);var c3 = new ConsumerGroup(Object.assign({id: 'c3'}, options), topic);
c3.on('message', onMessage);
c3.on('error', onError);function onMessage(message) {console.log(this.client.clientId);console.log(message);
}function onError(err) {console.log(err);
}process.once('SIGINT', function() {async.each([c1, c2, c3], function(c, cb) {c.close(true, cb);})
})
测试运行:
cd /home/node/kafka/test/nodemon -w consumer.js consumer.jsnode producer.js
参考:
https://www.npmjs.com/package/kafka-node
转载于:https://my.oschina.net/qiongtaoli/blog/914899
nodejs链接kafka示例(producer、consumer)相关推荐
- Kafka设计解析(十三)Kafka消费组(consumer group)
转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...
- Kafka入门教程其一 消息队列基本概念 及常用Producer Consumer配置详解学习笔记
文章目录 1. 综述 2. 消息队列(Message Queue) 2.1 点对点 2.2 发布/订阅(pub-sub) 3. Kafka基础术语解释 3.1 Broker 3.2 Partition ...
- java连接kafka api_Kafka-JavaAPI(Producer And Consumer)
Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...
- Kafka的producer案例,Kafka的consumer案例
1.编写所需的pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns= ...
- SpringBoot整合kafka(实现producer和consumer)
转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...
- kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...
- Kafka:High level consumer vs. Low level consumer
Kafka中的消费者有两套API,分别是high level的和low level的.两种消费方式在构造和实现上都是不同的,在此记录一下: 一.High level consumer API High ...
- 大数据技术之 Kafka (第 4 章 Kafka API ) Producer API
4.1.1 消息发送流程 Kafka 的 Producer 发送消息采用的是异步发送的方式.在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程,以及一个线程共享变量--Rec ...
- 057 Java中kafka的Producer程序实现
1.需要启动的服务 这里启动的端口是9092. bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibe ...
最新文章
- 怎样让计算机恢复到桌面上,如何把电脑桌面恢复成原样.怎么办?
- 【知识梳理1】Android触摸事件机制
- Kubernetes要成为一个企业友好平台,到底还缺啥?
- 常量的定义与使用 1006 c#
- 高并发中计数器的实现方式有哪些?
- 让敏捷的回顾会议变得有趣而高效
- SQL SERVER如何通过SQL语句获服务器硬件和系统信息
- 拓端tecdat|【数据分享】学生受欢迎程度评价数据集
- 游戏建模软件的ZBrush和Mudbox哪个好
- 解决Iframe嵌入帆软BI系统后,Chrome升级后跨域出现登录界面,Cookie写入不成功。解决办法
- maya2011安装方法图文详细教程及Unity下载
- linux基本操作大全centos7
- python如何读取uni文件_如何在Python中通过HTTP与UniProt交谈?
- VHD轉換VHDX格式
- tar.gzh和zip的区别
- 全球注意力缺陷多动障碍(ADHD)市场规模2021年大约为796亿元(人民币),预计2028年将达到1259亿元
- 89c52单片机控制两个步进电机正反转加减速(程序+仿真)
- 【PBR系列六】基于物理的环境光照(上):漫反射辐照度(Diffuse irradiance)
- js将数组中相同项放在一个数组
- 电脑自动配置了两个IP4解决方法
热门文章
- 机器人 沈为民_会变形的机器人
- delete 字符数组 []_前端基础扫盲系列 长达8000字的数组总结
- 改工作空间_打拼六年换的新房,装修花了17万,飘窗改柜子很实用,谁见过?...
- python中0xFFFFFFFFFFFFFFFF这种字符串是什么意思呢
- java如何理解继承性_理解 Java 的三大特性之继承
- python列表、元组、字典和集合的算法时间_27.Python列表(list)、元组(tuple)、字典(dict)和集合(set)详解...
- 电商平台 高并发 微服务 方案_Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战...
- Amazon EC2 AMI实现X11 forwarding【从MobaXterm打开浏览器和图形界面】
- 使用yangtools将yang文件转化成java
- (7)操作系统安全机制一