需要安装的npm包:https://www.npmjs.com/package/wisrtoni40-confluent-schema#Quickstart

npm i wisrtoni40-confluent-schema --save

procedurer.ts文件

import { HighLevelProducer, KafkaClient } from 'kafka-node';import { v4 as uuidv4 } from 'uuid';import {ConfluentAvroStrategy,ConfluentMultiRegistry,ConfluentPubResolveStrategy,} from 'wisrtoni40-confluent-schema';/*** -----------------------------------------------------------------------------* Config* -----------------------------------------------------------------------------*/const kafkaHost = '你的kafka host';const topic = '你的topic';const registryHost ='你的kafka注册host';/*** -----------------------------------------------------------------------------* Kafka Client and Producer* -----------------------------------------------------------------------------*/const kafkaClient = new KafkaClient({kafkaHost,clientId: uuidv4(),connectTimeout: 60000,requestTimeout: 60000,connectRetryOptions: {retries: 5,factor: 0,minTimeout: 1000,maxTimeout: 1000,randomize: false,},sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},});const producer = new HighLevelProducer(kafkaClient, {requireAcks: 1,ackTimeoutMs: 100,});/*** -----------------------------------------------------------------------------* Confluent Resolver* -----------------------------------------------------------------------------*/const schemaRegistry = new ConfluentMultiRegistry(registryHost);const avro = new ConfluentAvroStrategy();const resolver = new ConfluentPubResolveStrategy(schemaRegistry, avro, topic);/*** -----------------------------------------------------------------------------* Produce* -----------------------------------------------------------------------------*/(async () => {const data = {evt_dt: 1664446229425,evt_type: 'tower_unload',plant: 'F110',machineName: 'TOWER_01',errorCode: '',description: '',result: 'OK',evt_ns: 'wmy.dx',evt_tp: 'tower.error',evt_pid: 'TOWER_01',evt_pubBy: 'nifi.11142'};const processedData = await resolver.resolve(data);producer.send([{ topic, messages: processedData }], (error, result) => {if (error) {console.error(error);} else {console.log(result);}});})();

procedurer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';const kafkaClient = new kafka_node_1.KafkaClient({kafkaHost,clientId: (0, uuid_1.v4)(),connectTimeout: 60000,requestTimeout: 60000,connectRetryOptions: {retries: 5,factor: 0,minTimeout: 1000,maxTimeout: 1000,randomize: false,},sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},
});const producer = new kafka_node_1.HighLevelProducer(kafkaClient, {requireAcks: 1,ackTimeoutMs: 100,
});const schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
const avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
const resolver = new wisrtoni40_confluent_schema_1.ConfluentPubResolveStrategy(schemaRegistry, avro, topic);(async () => {const data = {evt_dt: 1664446229425,evt_type: 'tower_unload',plant: 'F110',machineName: 'TOWER_01',errorCode: '',description: '',result: 'OK',evt_ns: 'wmy.dx',evt_tp: 'tower.error',evt_pid: 'TOWER_01',evt_pubBy: 'nifi.11142'};const processedData = await resolver.resolve(data);producer.send([{ topic, messages: processedData }], (error, result) => {if (error) {console.error(error);} else {console.log(result);}});
})();

consumer.ts文件

import { ConsumerGroup } from 'kafka-node';
import { v4 as uuidv4 } from 'uuid';
import {ConfluentAvroStrategy,ConfluentMultiRegistry,ConfluentSubResolveStrategy,
} from 'wisrtoni40-confluent-schema';/*** -----------------------------------------------------------------------------* Config* -----------------------------------------------------------------------------*/const kafkaHost = '你的kafka host';
const topic = '你的topic';
const registryHost ='你的kafka注册host';/*** -----------------------------------------------------------------------------* Kafka Consumer* -----------------------------------------------------------------------------*/const consumer = new ConsumerGroup({kafkaHost,groupId: uuidv4(),sessionTimeout: 15000,protocol: ['roundrobin'],encoding: 'buffer',fromOffset: 'latest',outOfRangeOffset: 'latest',sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码',},},topic,
);/*** -----------------------------------------------------------------------------* Confluent Resolver* -----------------------------------------------------------------------------*/const schemaRegistry = new ConfluentMultiRegistry(registryHost);
const avro = new ConfluentAvroStrategy();
const resolver = new ConfluentSubResolveStrategy(schemaRegistry, avro);/*** -----------------------------------------------------------------------------* Consume* -----------------------------------------------------------------------------*/consumer.on('message', async msg => {const result = await resolver.resolve(msg.value);console.log(msg.offset);console.log(result);
});

comsumer.js文件

var kafka_node_1 = require("kafka-node");
var uuid_1 = require("uuid");
var wisrtoni40_confluent_schema_1 = require("wisrtoni40-confluent-schema");var kafkaHost = '你的kafka host';
var topic = '你的topic';
var registryHost = '你的kafka注册host';var consumer = new kafka_node_1.ConsumerGroup({kafkaHost: kafkaHost,groupId: (0, uuid_1.v4)(),sessionTimeout: 15000,protocol: ['roundrobin'],encoding: 'buffer',fromOffset: 'latest',outOfRangeOffset: 'latest',sasl: {mechanism: 'plain',username: '你的kafka用户名',password: '你的kafka密码'}
}, topic);var schemaRegistry = new wisrtoni40_confluent_schema_1.ConfluentMultiRegistry(registryHost);
var avro = new wisrtoni40_confluent_schema_1.ConfluentAvroStrategy();
var resolver = new wisrtoni40_confluent_schema_1.ConfluentSubResolveStrategy(schemaRegistry, avro);consumer.on('message', async function (msg) {const result = await resolver.resolve(msg.value);console.log(msg.offset);console.log(result);
});

node连接kafka2.0相关推荐

  1. [2021] node连接oracle数据库示例[使用oracle官方组件]

    node 连接 oracle 示例 本示例采用的 oracledb 和 instantclient-basic-windows 来源于oracle官方 官方文档 https://oracle.gith ...

  2. oracle 连接组件,[2021] node连接oracle数据库示例[使用oracle官方组件]

    [2021] node连接oracle数据库示例[使用oracle官方组件] node 连接 oracle 示例 本示例采用的 oracledb 和 instantclient-basic-windo ...

  3. node 连接MySQL

    使用node创建一个服务端比java简单多,下面创建一个node服务端,连接MySQL并且将数据在浏览器显示出来 一. node创建服务端案例 var http = require("htt ...

  4. CentOS7.5下安装Mycat连接MySQL8.0

    CentOS7.5下安装Mycat连接MySQL8.0 Posted on 2018-11-08 11:07 许爱琪 阅读(645) 评论(2) 编辑 收藏 MyCat详细介绍,请参考https:// ...

  5. 微信小程序通过 node 连接 mysql——方法,简要原理,及一些常见问题

    前言 博主自己在22年夏天根据课程要求做了一个小程序连接阿里云服务器的案例,在最近又碰到了相应的需求. 原参考文章:微信小程序 Node连接本地MYSQL_微信小程序nodejs连接数据库_JJJen ...

  6. 微信小程序 Node连接本地MYSQL

    微信小程序 Node连接本地MYSQL 搭建Node环境 小程序中js发送请求 原博客基础上略微修改 搭建Node环境 前提:MYSQL已经创建好数据库 + 安装好node 项目中,新建一个文件ser ...

  7. 拥抱Node.js 8.0,N-API入门极简例子

    本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...

  8. dw8与mysql的连接,VS2019连接mysql8.0数据库的教程图文详解

    1.首先准备好vs2019以及mysql数据库,两者都可以去官网下载,我们直接描述连接过程. 2.连接: 第一步:打开mysql的安装目录,我本地的安装目录如下:(注意是否有include和lib文件 ...

  9. 拥抱 Node.js 8.0,N-API 入门极简例子

    本文摘录自<Nodejs学习笔记>,更多章节及更新,请访问 github主页地址.欢迎加群交流,群号 197339705. N-API简介 Node.js 8.0 在2017年6月份发布, ...

最新文章

  1. PATH和path,傻傻分不清
  2. mac linux 蓝牙键盘,还在纠结Mac版键盘?试试KeyRemap4MacBook吧!
  3. easyconnect无法在mac上使用_Mac上Python无法输入中文- 2017年
  4. JavaScript的历史由来及简介
  5. 选择AWS或Azure?这可能是个错误的问题
  6. oracle忽略损坏表空间,Oracle表空间文件损坏后的排查及解决
  7. 恒生校招java笔试数据库语法_2015恒生电子校招笔试详解
  8. 基于FPGA 的CRC校验码生成器
  9. 数学建模方法——斯皮尔曼相关系数及其显著性检验 (Spearman’s correlation coefficient for ranked data)
  10. 铂电阻测温电路c语言程序,pt100检测电路,Pt100铂电阻测温电路经验
  11. arm怎么运行python_给arm板编译移植python(一)
  12. 北大MBA夫妇不满现有教育系统 携女隐居终南山
  13. uva11401:Triangle Counting 递推 数学
  14. 2021十大黄金理财app平台排行榜
  15. JS阻止默认行为和Vue阻止默认行为
  16. C#最小二乘法进行曲线拟合及相关系数
  17. [易飞]如何实现同单据两种不同凭证设计方式?(只打印单头单尾金额,多页最后一页面显示金额)
  18. 频率与周期的对应关系
  19. 为XV6系统扩展一个系统调用需要修改的文件
  20. Steven-Java-运算符号(简单)

热门文章

  1. pundit的使用ruby on rails
  2. 机票预订系统活动图_机票预订系统(概要设计说明书)
  3. 北京公户京牌指标相关问题详解
  4. 记录一次alignment fault
  5. (20)打鸡儿教你Vue.js
  6. 智慧零售时代,苏宁如何迎接双十一“大考”?
  7. 视频教程-在Vue中使用GraphQL实现聊天室-Vue
  8. 用我的沃土编织你的穹顶:华为的欧洲告白
  9. 若依前端vue角色、权限判断
  10. Dragon Balls