node连接kafka2.0
需要安装的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart
npm i wisrtoni40-confluent-schema --save
procedurer.ts文件
import { HighLevelProducer, KafkaClient } from 'kafka-node';import { v4 as uuidv4 } from 'uuid';import {ConfluentAvroStrategy,ConfluentMultiRegistry,ConfluentPubResolveStrategy,} from 'wisrtoni40-confluent-schema';/*** -----------------------------------------------------------------------------* Config* -----------------------------------------------------------------------------*/const kafkaHost = '你的kafka host';const topic = '你的topic';const registryHost ='你的kafka注册host';/*** -----------------------------------------------------------------------------* Kafka Client and Producer* -----------------------------------------------------------------------------*/const kafkaClient = new KafkaClient({kafkaHost,clientId: uuidv4(),connectTimeout: 60000,requestTimeout: 60000,connectRetryOptions: {retries: 5,factor: 0,minTimeout: 1000,maxTimeout: 1000,randomize: false,},sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},});const producer = new HighLevelProducer(kafkaClient, {requireAcks: 1,ackTimeoutMs: 100,});/*** -----------------------------------------------------------------------------* Confluent Resolver* -----------------------------------------------------------------------------*/const schemaRegistry = new ConfluentMultiRegistry(registryHost);const avro = new ConfluentAvroStrategy();const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic);/*** -----------------------------------------------------------------------------* Produce* -----------------------------------------------------------------------------*/(async () => {const data = {evt_dt: 1664446229425,evt_type: 'tower_unload',plant: 'F110',machineName: 'TOWER_01',errorCode: '',description: '',result: 'OK',evt_ns: 'wmy.dx',evt_tp: 'tower.error',evt_pid: 'TOWER_01',evt_pubBy: 'nifi.11142'};const processedData = await resolver.resolve(data);producer.send([{ topic, messages: processedData }], (error, result) => {if (error) {console.error(error);} else {console.log(result);}});})();
procedurer.js文件
var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';const kafkaClient = new kafka_node_1.KafkaClient({kafkaHost,clientId: (0, uuid_1.v4)(),connectTimeout: 60000,requestTimeout: 60000,connectRetryOptions: {retries: 5,factor: 0,minTimeout: 1000,maxTimeout: 1000,randomize: false,},sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},
});const producer = new kafka_node_1.HighLevelProducer(kafkaClient, {requireAcks: 1,ackTimeoutMs: 100,
});const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic);(async () => {const data = {evt_dt: 1664446229425,evt_type: 'tower_unload',plant: 'F110',machineName: 'TOWER_01',errorCode: '',description: '',result: 'OK',evt_ns: 'wmy.dx',evt_tp: 'tower.error',evt_pid: 'TOWER_01',evt_pubBy: 'nifi.11142'};const processedData = await resolver.resolve(data);producer.send([{ topic, messages: processedData }], (error, result) => {if (error) {console.error(error);} else {console.log(result);}});
})();
consumer.ts文件
import { ConsumerGroup } from 'kafka-node';
import { v4 as uuidv4 } from 'uuid';
import {ConfluentAvroStrategy,ConfluentMultiRegistry,ConfluentSubResolveStrategy,
} from 'wisrtoni40-confluent-schema';/*** -----------------------------------------------------------------------------* Config* -----------------------------------------------------------------------------*/const kafkaHost = '你的kafka host';
const topic = '你的topic';
const registryHost ='你的kafka注册host';/*** -----------------------------------------------------------------------------* Kafka Consumer* -----------------------------------------------------------------------------*/const consumer = new ConsumerGroup({kafkaHost,groupId: uuidv4(),sessionTimeout: 15000,protocol: ['roundrobin'],encoding: 'buffer',fromOffset: 'latest',outOfRangeOffset: 'latest',sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},},topic,
);/*** -----------------------------------------------------------------------------* Confluent Resolver* -----------------------------------------------------------------------------*/const schemaRegistry = new ConfluentMultiRegistry(registryHost);
const avro = new ConfluentAvroStrategy();
const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro);/*** -----------------------------------------------------------------------------* Consume* -----------------------------------------------------------------------------*/consumer.on('message', async msg => {const result = await resolver.resolve(msg.value);console.log(msg.offset);console.log(result);
});
comsumer.js文件
var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';var consumer = new kafka_node_1.ConsumerGroup({kafkaHost: kafkaHost,groupId: (0, uuid_1.v4)(),sessionTimeout: 15000,protocol: ['roundrobin'],encoding: 'buffer',fromOffset: 'latest',outOfRangeOffset: 'latest',sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码'}
}, topic);var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro);consumer.on('message', async function (msg) {const result = await resolver.resolve(msg.value);console.log(msg.offset);console.log(result);
});
node连接kafka2.0相关推荐
- [2021] node连接oracle数据库示例[使用oracle官方组件]
node 连接 oracle 示例 本示例采用的 oracledb 和 instantclient-basic-windows 来源于oracle官方 官方文档 https://oracle.gith ...
- oracle 连接组件,[2021] node连接oracle数据库示例[使用oracle官方组件]
[2021] node连接oracle数据库示例[使用oracle官方组件] node 连接 oracle 示例 本示例采用的 oracledb 和 instantclient-basic-windo ...
- node 连接MySQL
使用node创建一个服务端比java简单多,下面创建一个node服务端,连接MySQL并且将数据在浏览器显示出来 一. node创建服务端案例 var http = require("htt ...
- CentOS7.5下安装Mycat连接MySQL8.0
CentOS7.5下安装Mycat连接MySQL8.0 Posted on 2018-11-08 11:07 许爱琪 阅读(645) 评论(2) 编辑 收藏 MyCat详细介绍,请参考https:// ...
- 微信小程序通过 node 连接 mysql——方法,简要原理,及一些常见问题
前言 博主自己在22年夏天根据课程要求做了一个小程序连接阿里云服务器的案例,在最近又碰到了相应的需求. 原参考文章:微信小程序 Node连接本地MYSQL_微信小程序nodejs连接数据库_JJJen ...
- 微信小程序 Node连接本地MYSQL
微信小程序 Node连接本地MYSQL 搭建Node环境 小程序中js发送请求 原博客基础上略微修改 搭建Node环境 前提:MYSQL已经创建好数据库 + 安装好node 项目中,新建一个文件ser ...
- 拥抱Node.js 8.0,N-API入门极简例子
本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...
- dw8与mysql的连接,VS2019连接mysql8.0数据库的教程图文详解
1.首先准备好vs2019以及mysql数据库,两者都可以去官网下载,我们直接描述连接过程. 2.连接: 第一步:打开mysql的安装目录,我本地的安装目录如下:(注意是否有include和lib文件 ...
- 拥抱 Node.js 8.0,N-API 入门极简例子
本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...
最新文章
- PATH和path,傻傻分不清
- mac linux 蓝牙键盘,还在纠结Mac版键盘?试试KeyRemap4MacBook吧!
- easyconnect无法在mac上使用_Mac上Python无法输入中文- 2017年
- JavaScript的历史由来及简介
- 选择AWS或Azure?这可能是个错误的问题
- oracle忽略损坏表空间,Oracle表空间文件损坏后的排查及解决
- 恒生校招java笔试数据库语法_2015恒生电子校招笔试详解
- 基于FPGA 的CRC校验码生成器
- 数学建模方法——斯皮尔曼相关系数及其显著性检验 (Spearman’s correlation coefficient for ranked data)
- 铂电阻测温电路c语言程序,pt100检测电路,Pt100铂电阻测温电路经验
- arm怎么运行python_给arm板编译移植python(一)
- 北大MBA夫妇不满现有教育系统 携女隐居终南山
- uva11401:Triangle Counting 递推 数学
- 2021十大黄金理财app平台排行榜
- JS阻止默认行为和Vue阻止默认行为
- C#最小二乘法进行曲线拟合及相关系数
- [易飞]如何实现同单据两种不同凭证设计方式?(只打印单头单尾金额,多页最后一页面显示金额)
- 频率与周期的对应关系
- 为XV6系统扩展一个系统调用需要修改的文件
- Steven-Java-运算符号(简单)