【深入浅出 Node + React 的微服务项目】15.连接 NATS
连接 NATS
文章目录
- 连接 NATS
- 可复用的 NATS Listener
- Listener 的 Abstract Class
- 继承 Listener 类
- 重构 Listener 代码
- 使用 TypeScript 进行 Listener Validation
- Subjects 的枚举
- TicketCreatedEvent 的 interface
- 强制 Listener 应用自定义的 Event 和 Event 内部的参数
- Quick Note: 'readonly' in Typescript
- 在 Create Listener 中 对 Create Event 的 data 进行约束
- 现在的架构
- Publisher 的 abstract 和 extends
- 使用 Publisher
- Event Publication 的 异步操作
- 对于该项目通用事件模块缺点分析
- 更新 npm 的 Common Module
- 重启 NATS
可复用的 NATS Listener
- Listener 响应目前我们还只是 console.log
// listener.tssubscription.on("message", (msg: Message) => {const data = msg.getData();if (typeof data === "string") {console.log(`Received event #${msg.getSequence()}, with data: ${data}`);}msg.ack();});
- 有很多样式和配置来 publish/receive 我们的 event 信息
- 所以我们想重新写一个通用的 NATS Listener 抽象类 来重构 NATS Listener,实现定制化 publish/receive,并处理之后的操作
⬆ back to top
Listener 的 Abstract Class
abstract class Listener {abstract subject: string;abstract queueGroupName: string;abstract onMessage(data: any, msg: Message): void;private client: Stan;protected ackWait = 5 * 1000;constructor(client: Stan) {this.client = client;}subscriptionOptions() {return this.client.subscriptionOptions().setDeliverAllAvailable().setManualAckMode(true).setAckWait(this.ackWait).setDurableName(this.queueGroupName);}listen() {const subscription = this.client.subscribe(this.subject,this.queueGroupName,this.subscriptionOptions())subscription.on('message', (msg: Message) => {console.log(`Message received: ${this.subject} / ${this.queueGroupName}`);const parsedData = this.parseMessage(msg);this.onMessage(parsedData, msg);})}parseMessage(msg: Message) {const data = msg.getData();return typeof data === 'string'? JSON.parse(data): JSON.parse(data.toString('utf8'))}
}
⬆ back to top
继承 Listener 类
class TicketCreatedListener extends Listener {subject = 'ticket:created';queueGroupName = 'payments-service';onMessage(data: any, msg: Message) {console.log('Event data!', data);msg.ack();}
}
⬆ back to top
重构 Listener 代码
// listener.ts
import nats from 'node-nats-streaming';
import { randomBytes } from 'crypto';
import { TicketCreatedListener } from './events/ticket-created-listener';console.clear();const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {url: 'http://localhost:4222',
});stan.on('connect', () => {console.log('Listener connected to NATS');stan.on('close', () => {console.log('NATS connection closed!');process.exit();});new TicketCreatedListener(stan).listen();
});process.on('SIGINT', () => stan.close());
process.on('SIGTERM', () => stan.close());
⬆ back to top
使用 TypeScript 进行 Listener Validation
所以在 TicketCreatedListener 中,需要 subject 和 data 强关联
并且如果 subject 和 data 并不关联,TypeScript 还需要进行报错
⬆ back to top
Subjects 的枚举
enum 类型,用自定义的 key 记录值,以后 访问 key、或者[xxx] 就行了
// events/subjects.ts
export enum Subjects {TicketCreated = 'ticket:created',OrderUpdated = 'order:updated',
}// const printSubject = (subject: Subjects) => {//
// }
// printSubject(Subjects.TicketCreated)
// printSubject(Subjects[0])
⬆ back to top
TicketCreatedEvent 的 interface
// ticket-created-event.ts
import { Subjects } from "./subjects";export interface TicketCreatedEvent {subject: Subjects.TicketCreated;data: {id: string;title: string;price: number;};
}
⬆ back to top
强制 Listener 应用自定义的 Event 和 Event 内部的参数
// base-listener.ts
import { Subjects } from './subjects';interface Event {subject: Subjects;data: any;
}export abstract class Listener<T extends Event> {abstract subject: T['subject'];abstract onMessage(data: T['data'], msg: Message): void;
}
// ticket-created-listener.ts
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects';export class TicketCreatedListener extends Listener<TicketCreatedEvent> {subject: Subjects.TicketCreated = Subjects.TicketCreated;...
}
⬆ back to top
Quick Note: ‘readonly’ in Typescript
subject = Subjects.TicketCreated;
会报错,为什么?- 因为 TypeScript 会认为我们以后会对 subject 进行修改
- 而现在其实是 Subject 类型,
abstract subject: T['subject'];
- 如果不带上 type 的话,可能以后会被我们强制
this.subject = Subjects.OrderCreated;
- 所以,要么就带上 type,要么就需要用 readonly 表示不能被修改
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {readonly subject = Subjects.TicketCreated;// ...everything else
}
⬆ back to top
在 Create Listener 中 对 Create Event 的 data 进行约束
data: TicketCreatedEvent['data']
// ticket-created-listener.ts
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {onMessage(data: TicketCreatedEvent['data'], msg: Message) {console.log('Event data!', data);console.log(data.id);console.log(data.title);console.log(data.price);msg.ack();}
}
⬆ back to top
现在的架构
⬆ back to top
Publisher 的 abstract 和 extends
// base-publisher.ts
import { Stan } from 'node-nats-streaming';
import { Subjects } from './subjects';interface Event {subject: Subjects;data: any;
}export abstract class Publisher<T extends Event> {abstract subject: T['subject'];private client: Stan;constructor(client: Stan) {this.client = client;}publish(data: T['data']) {this.client.publish(this.subject, JSON.stringify(data), () => {console.log('Event published.')})}
}
// ticket-created-publisher.ts
import { Publisher } from './base-publisher';
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects';export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {readonly subject = Subjects.TicketCreated;
}
⬆ back to top
使用 Publisher
// publisher.ts
stan.on('connect', () => {console.log('Publisher connected to NATS');const publisher = new TicketCreatedPublisher(stan);publisher.publish({id: '123',title: 'concert',price: 20});
});
⬆ back to top
Event Publication 的 异步操作
- 目前使用异步的原因是为了 publish 过后,捕获 Error 的信息
- 封装 Promise
// base-publisher.tspublish(data: T['data']): Promise<void> {return new Promise((resolve, reject) => {this.client.publish(this.subject, JSON.stringify(data), (err) => {if (err) {return reject(err);}console.log('Event published to subject', this.subject);resolve();});});}
- await 执行 resolve()
// publisher.ts
stan.on('connect', async () => {console.log('Publisher connected to NATS');const publisher = new TicketCreatedPublisher(stan);try {await publisher.publish({id: '123',title: 'concert',price: 20});}catch (err) {console.error(err);}
});
⬆ back to top
对于该项目通用事件模块缺点分析
⬆ back to top
更新 npm 的 Common Module
- base-listener.ts
- base-publisher.ts
- subjects.ts
- ticket-created-event.ts
- ticket-updated-event.ts
⬆ back to top
重启 NATS
kubectl get pods
kubectl delete pod nats-depl-786b8cff8d-xd4tn
⬆ back to top
【深入浅出 Node + React 的微服务项目】15.连接 NATS相关推荐
- 【深入浅出 Node + React 的微服务项目】14. NATS Streaming Server
文章目录 现在该做什么 NATS Streaming Server 介绍 创建 NATS Streaming 的 Deployment NATS Streaming 的工作流程 创建一个 NATS 测 ...
- 【深入浅出 Node + React 的微服务项目】1.微服务的基本知识
[深入浅出 Node + React 的微服务项目] 微服务的基本知识 目录 [该目录用于 Github 的 md,故 CSDN 上不能目录跳转和 back to top,sry] 第一步: 微服务的 ...
- 【深入浅出 Node + React 的微服务项目】16. 对 NATS Client 进行管理
文章目录 发布 TicketCreate 将 NATS Client 独立出来 回顾 mongoose 的实现 nats client 独立的实现 访问 NATS Client 优雅的关闭 成功监听 ...
- 微服务项目后台技术栈
微服务项目后台相关技术整理 主要技术 ORM框架-Mybatis Plus Mybatis Plus核心功能 MyBatis Plus与SpringBoot集成 MyBatis Plus集成Sprin ...
- 微服务项目的整合与测试
实验目的 掌握微服务项目的整合使用 掌握Swagger-UI的简单使用 练习内容 1.微服务项目整合 1.1.项目预览 1.1.1.在 https://github.com/shi469391tou/ ...
- Docker Compose配置springboot微服务项目
[Docker那些事]系列文章 docker 安装 与 卸载 centos Dockerfile 文件结构.docker镜像构建过程详细介绍 Dockerfile文件中CMD指令与ENTRYPOINT ...
- k8s部署微服务项目
之前用docker-compose部署微服务项目,但是只能单节点的(那你用微服务架构干啥?),所以想搞一下k8s集群,网上找了下资料没有视频专门讲这一块,自己找了很多资料,搞了蛮长时间的,所以记录一下 ...
- SpringCloud入门总结 + 使用SpringCloud搭建微服务项目
SpringCloud 1.认识微服务 2.认识spring Cloud 3.Spring Cloud Eureka 服务发现框架 3.1认识Eureka 3.2 实战--开发并部署Eureka Se ...
- 原生K8S部署pig微服务项目
原生K8S部署pig微服务项目 简介 项目地址:码云 Pig微服务项目 基于 Spring Cloud 2021 .Spring Boot 2.7. OAuth2 的 RBAC 权限管理系统 基于数据 ...
最新文章
- AVAssetExportSession导出MP4视频失败
- 论文笔记:Inception v1
- [系统底层] x86和x64下ssdt的差异
- 关于tcmalloc\malloc和new
- idea ssm打war包_IDEA下从零开始搭建SpringBoot工程
- 此计算机无法设置密码,电脑该怎么设置密码
- # RSA 公钥加密算法
- 代写python作业费用标准_代做159.272作业、代写Programming Paradigms作业、代做Python实验作业、代写Java/c++编程作业代写Database|代做R...
- 使用layui 做后台管理界面,在Tab中的链接点击后添加一个新TAB的解决方法
- PHP获取对象的hashcode_php 字符串转hashcode(包括中文)
- 手把手教你强化学习(十) 基于Stochastic Policy的深度强化学习方法
- asp.net dev xtraReporting(一)静态页面
- kubernetes视频教程笔记 (17)-Job和CronJob
- ffmpeg完全教程
- win10通过pe修复uefi引导后bootmanage出现两个引导项很烦人
- MySQL临时表的使用
- 地图学:专题地图制作详细步骤
- 选择服务器系统,服务器系统选择
- 统一社会信用代码正则表达式
- 【mysql】 踩坑记录之derived(派生表)