2019独角兽企业重金招聘Python工程师标准>>>

安装node环境:

wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-linux-x64.tar.xz
tar xf node-v6.10.3-linux-x64.tar.xz -C /usr/local
mv /usr/local/node-v6.10.3-linux-x64 /usr/local/node
rm -f node-v6.10.3-linux-x64.tar.xz
echo 'PATH=$PATH:/usr/local/node/bin' >> /etc/profile
source /etc/profilenode -v
npm -vnpm install -g nodemon

定义目录:

mkdir -p /home/node/kafka/test

安装node-kafka库:

cd /home/node/
npm install kafka-node

定义producer.js

var kafka = require('kafka-node'),Producer = kafka.Producer,Client = kafka.Client;var client = new Client('localhost:2181', 'producer-test');var producer = new Producer(client);var topic = 'test01';var payloads = [{ topic: topic, messages: 'this is test message' },
];producer.on('ready', function() {producer.createTopics([topic], function(err, data) {producer.send(payloads, function(err, data) {console.log(err || data);process.exit();})})
})producer.on('error', function(err) {console.log('error', err);
})

定义consumer.js

var async = require('async'),kafka = require('kafka-node'),ConsumerGroup = kafka.ConsumerGroup;var topic = 'test01';var options = {host: 'localhost:2181',groupId: 'group-test',sessionTimeout: 15000,autoCommit: true,
};var c1 = new ConsumerGroup(Object.assign({id: 'c1'}, options), topic);
c1.on('message', onMessage);
c1.on('error', onError);var c2 = new ConsumerGroup(Object.assign({id: 'c2'}, options), topic);
c2.on('message', onMessage);
c2.on('error', onError);var c3 = new ConsumerGroup(Object.assign({id: 'c3'}, options), topic);
c3.on('message', onMessage);
c3.on('error', onError);function onMessage(message) {console.log(this.client.clientId);console.log(message);
}function onError(err) {console.log(err);
}process.once('SIGINT', function() {async.each([c1, c2, c3], function(c, cb) {c.close(true, cb);})
})

测试运行:

cd /home/node/kafka/test/nodemon -w consumer.js consumer.jsnode producer.js

参考:

https://www.npmjs.com/package/kafka-node

转载于:https://my.oschina.net/qiongtaoli/blog/914899

nodejs链接kafka示例(producer、consumer)相关推荐

  1. Kafka设计解析(十三)Kafka消费组(consumer group)

    转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...

  2. Kafka入门教程其一 消息队列基本概念 及常用Producer Consumer配置详解学习笔记

    文章目录 1. 综述 2. 消息队列(Message Queue) 2.1 点对点 2.2 发布/订阅(pub-sub) 3. Kafka基础术语解释 3.1 Broker 3.2 Partition ...

  3. java连接kafka api_Kafka-JavaAPI(Producer And Consumer)

    Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...

  4. Kafka的producer案例,Kafka的consumer案例

    1.编写所需的pom.xml文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns= ...

  5. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  6. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  7. Kafka:High level consumer vs. Low level consumer

    Kafka中的消费者有两套API,分别是high level的和low level的.两种消费方式在构造和实现上都是不同的,在此记录一下: 一.High level consumer API High ...

  8. 大数据技术之 Kafka (第 4 章 Kafka API ) Producer API

    4.1.1 消息发送流程 Kafka 的 Producer 发送消息采用的是异步发送的方式.在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程,以及一个线程共享变量--Rec ...

  9. 057 Java中kafka的Producer程序实现

    1.需要启动的服务 这里启动的端口是9092. bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibe ...

最新文章

  1. 怎样让计算机恢复到桌面上,如何把电脑桌面恢复成原样.怎么办?
  2. 【知识梳理1】Android触摸事件机制
  3. Kubernetes要成为一个企业友好平台,到底还缺啥?
  4. 常量的定义与使用 1006 c#
  5. 高并发中计数器的实现方式有哪些?
  6. 让敏捷的回顾会议变得有趣而高效
  7. SQL SERVER如何通过SQL语句获服务器硬件和系统信息
  8. 拓端tecdat|【数据分享】学生受欢迎程度评价数据集
  9. 游戏建模软件的ZBrush和Mudbox哪个好
  10. 解决Iframe嵌入帆软BI系统后,Chrome升级后跨域出现登录界面,Cookie写入不成功。解决办法
  11. maya2011安装方法图文详细教程及Unity下载
  12. linux基本操作大全centos7
  13. python如何读取uni文件_如何在Python中通过HTTP与UniProt交谈?
  14. VHD轉換VHDX格式
  15. tar.gzh和zip的区别
  16. 全球注意力缺陷多动障碍(ADHD)市场规模2021年大约为796亿元(人民币),预计2028年将达到1259亿元
  17. 89c52单片机控制两个步进电机正反转加减速(程序+仿真)
  18. 【PBR系列六】基于物理的环境光照(上):漫反射辐照度(Diffuse irradiance)
  19. js将数组中相同项放在一个数组
  20. 电脑自动配置了两个IP4解决方法

热门文章

  1. 机器人 沈为民_会变形的机器人
  2. delete 字符数组 []_前端基础扫盲系列 长达8000字的数组总结
  3. 改工作空间_打拼六年换的新房,装修花了17万,飘窗改柜子很实用,谁见过?...
  4. python中0xFFFFFFFFFFFFFFFF这种字符串是什么意思呢
  5. java如何理解继承性_理解 Java 的三大特性之继承
  6. python列表、元组、字典和集合的算法时间_27.Python列表(list)、元组(tuple)、字典(dict)和集合(set)详解...
  7. 电商平台 高并发 微服务 方案_Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战...
  8. Amazon EC2 AMI实现X11 forwarding【从MobaXterm打开浏览器和图形界面】
  9. 使用yangtools将yang文件转化成java
  10. (7)操作系统安全机制一