连接 NATS

文章目录

  • 连接 NATS
    • 可复用的 NATS Listener
    • Listener 的 Abstract Class
    • 继承 Listener 类
    • 重构 Listener 代码
    • 使用 TypeScript 进行 Listener Validation
    • Subjects 的枚举
    • TicketCreatedEvent 的 interface
    • 强制 Listener 应用自定义的 Event 和 Event 内部的参数
    • Quick Note: 'readonly' in Typescript
    • 在 Create Listener 中 对 Create Event 的 data 进行约束
    • 现在的架构
    • Publisher 的 abstract 和 extends
    • 使用 Publisher
    • Event Publication 的 异步操作
    • 对于该项目通用事件模块缺点分析
    • 更新 npm 的 Common Module
    • 重启 NATS

可复用的 NATS Listener

  • Listener 响应目前我们还只是 console.log
// listener.tssubscription.on("message", (msg: Message) => {const data = msg.getData();if (typeof data === "string") {console.log(`Received event #${msg.getSequence()}, with data: ${data}`);}msg.ack();});
  • 有很多样式和配置来 publish/receive 我们的 event 信息
  • 所以我们想重新写一个通用的 NATS Listener 抽象类 来重构 NATS Listener,实现定制化 publish/receive,并处理之后的操作

⬆ back to top

Listener 的 Abstract Class

abstract class Listener {abstract subject: string;abstract queueGroupName: string;abstract onMessage(data: any, msg: Message): void;private client: Stan;protected ackWait = 5 * 1000;constructor(client: Stan) {this.client = client;}subscriptionOptions() {return this.client.subscriptionOptions().setDeliverAllAvailable().setManualAckMode(true).setAckWait(this.ackWait).setDurableName(this.queueGroupName);}listen() {const subscription = this.client.subscribe(this.subject,this.queueGroupName,this.subscriptionOptions())subscription.on('message', (msg: Message) => {console.log(`Message received: ${this.subject} / ${this.queueGroupName}`);const parsedData = this.parseMessage(msg);this.onMessage(parsedData, msg);})}parseMessage(msg: Message) {const data = msg.getData();return typeof data === 'string'? JSON.parse(data): JSON.parse(data.toString('utf8'))}
}

⬆ back to top

继承 Listener 类

class TicketCreatedListener extends Listener {subject = 'ticket:created';queueGroupName = 'payments-service';onMessage(data: any, msg: Message) {console.log('Event data!', data);msg.ack();}
}

⬆ back to top

重构 Listener 代码

// listener.ts
import nats from 'node-nats-streaming';
import { randomBytes } from 'crypto';
import { TicketCreatedListener } from './events/ticket-created-listener';console.clear();const stan = nats.connect('ticketing', randomBytes(4).toString('hex'), {url: 'http://localhost:4222',
});stan.on('connect', () => {console.log('Listener connected to NATS');stan.on('close', () => {console.log('NATS connection closed!');process.exit();});new TicketCreatedListener(stan).listen();
});process.on('SIGINT', () => stan.close());
process.on('SIGTERM', () => stan.close());

⬆ back to top

使用 TypeScript 进行 Listener Validation

  • 所以在 TicketCreatedListener 中,需要 subject 和 data 强关联

  • 并且如果 subject 和 data 并不关联,TypeScript 还需要进行报错

⬆ back to top

Subjects 的枚举

enum 类型,用自定义的 key 记录值,以后 访问 key、或者[xxx] 就行了

// events/subjects.ts
export enum Subjects {TicketCreated = 'ticket:created',OrderUpdated = 'order:updated',
}// const printSubject = (subject: Subjects) => {//
// }
// printSubject(Subjects.TicketCreated)
// printSubject(Subjects[0])

⬆ back to top

TicketCreatedEvent 的 interface

// ticket-created-event.ts
import { Subjects } from "./subjects";export interface TicketCreatedEvent {subject: Subjects.TicketCreated;data: {id: string;title: string;price: number;};
}

⬆ back to top

强制 Listener 应用自定义的 Event 和 Event 内部的参数

// base-listener.ts
import { Subjects } from './subjects';interface Event {subject: Subjects;data: any;
}export abstract class Listener<T extends Event> {abstract subject: T['subject'];abstract onMessage(data: T['data'], msg: Message): void;
}
// ticket-created-listener.ts
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects';export class TicketCreatedListener extends Listener<TicketCreatedEvent> {subject: Subjects.TicketCreated = Subjects.TicketCreated;...
}

⬆ back to top

Quick Note: ‘readonly’ in Typescript

  • subject = Subjects.TicketCreated;会报错,为什么?
  • 因为 TypeScript 会认为我们以后会对 subject 进行修改
  • 而现在其实是 Subject 类型,abstract subject: T['subject'];
  • 如果不带上 type 的话,可能以后会被我们强制 this.subject = Subjects.OrderCreated;
  • 所以,要么就带上 type,要么就需要用 readonly 表示不能被修改
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {readonly subject = Subjects.TicketCreated;// ...everything else
}

⬆ back to top

在 Create Listener 中 对 Create Event 的 data 进行约束

data: TicketCreatedEvent['data']

// ticket-created-listener.ts
export class TicketCreatedListener extends Listener<TicketCreatedEvent> {onMessage(data: TicketCreatedEvent['data'], msg: Message) {console.log('Event data!', data);console.log(data.id);console.log(data.title);console.log(data.price);msg.ack();}
}

⬆ back to top

现在的架构


⬆ back to top

Publisher 的 abstract 和 extends

// base-publisher.ts
import { Stan } from 'node-nats-streaming';
import { Subjects } from './subjects';interface Event {subject: Subjects;data: any;
}export abstract class Publisher<T extends Event> {abstract subject: T['subject'];private client: Stan;constructor(client: Stan) {this.client = client;}publish(data: T['data']) {this.client.publish(this.subject, JSON.stringify(data), () => {console.log('Event published.')})}
}
// ticket-created-publisher.ts
import { Publisher } from './base-publisher';
import { TicketCreatedEvent } from './ticket-created-event'
import { Subjects } from './subjects';export class TicketCreatedPublisher extends Publisher<TicketCreatedEvent> {readonly subject = Subjects.TicketCreated;
}

⬆ back to top

使用 Publisher

// publisher.ts
stan.on('connect', () => {console.log('Publisher connected to NATS');const publisher = new TicketCreatedPublisher(stan);publisher.publish({id: '123',title: 'concert',price: 20});
});

⬆ back to top

Event Publication 的 异步操作

  • 目前使用异步的原因是为了 publish 过后,捕获 Error 的信息
  • 封装 Promise
// base-publisher.tspublish(data: T['data']): Promise<void> {return new Promise((resolve, reject) => {this.client.publish(this.subject, JSON.stringify(data), (err) => {if (err) {return reject(err);}console.log('Event published to subject', this.subject);resolve();});});}
  • await 执行 resolve()
// publisher.ts
stan.on('connect', async () => {console.log('Publisher connected to NATS');const publisher = new TicketCreatedPublisher(stan);try {await publisher.publish({id: '123',title: 'concert',price: 20});}catch (err) {console.error(err);}
});

⬆ back to top

对于该项目通用事件模块缺点分析


⬆ back to top

更新 npm 的 Common Module

  • base-listener.ts
  • base-publisher.ts
  • subjects.ts
  • ticket-created-event.ts
  • ticket-updated-event.ts

⬆ back to top

重启 NATS

kubectl get pods
kubectl delete pod nats-depl-786b8cff8d-xd4tn

⬆ back to top

【深入浅出 Node + React 的微服务项目】15.连接 NATS相关推荐

  1. 【深入浅出 Node + React 的微服务项目】14. NATS Streaming Server

    文章目录 现在该做什么 NATS Streaming Server 介绍 创建 NATS Streaming 的 Deployment NATS Streaming 的工作流程 创建一个 NATS 测 ...

  2. 【深入浅出 Node + React 的微服务项目】1.微服务的基本知识

    [深入浅出 Node + React 的微服务项目] 微服务的基本知识 目录 [该目录用于 Github 的 md,故 CSDN 上不能目录跳转和 back to top,sry] 第一步: 微服务的 ...

  3. 【深入浅出 Node + React 的微服务项目】16. 对 NATS Client 进行管理

    文章目录 发布 TicketCreate 将 NATS Client 独立出来 回顾 mongoose 的实现 nats client 独立的实现 访问 NATS Client 优雅的关闭 成功监听 ...

  4. 微服务项目后台技术栈

    微服务项目后台相关技术整理 主要技术 ORM框架-Mybatis Plus Mybatis Plus核心功能 MyBatis Plus与SpringBoot集成 MyBatis Plus集成Sprin ...

  5. 微服务项目的整合与测试

    实验目的 掌握微服务项目的整合使用 掌握Swagger-UI的简单使用 练习内容 1.微服务项目整合 1.1.项目预览 1.1.1.在 https://github.com/shi469391tou/ ...

  6. Docker Compose配置springboot微服务项目

    [Docker那些事]系列文章 docker 安装 与 卸载 centos Dockerfile 文件结构.docker镜像构建过程详细介绍 Dockerfile文件中CMD指令与ENTRYPOINT ...

  7. k8s部署微服务项目

    之前用docker-compose部署微服务项目,但是只能单节点的(那你用微服务架构干啥?),所以想搞一下k8s集群,网上找了下资料没有视频专门讲这一块,自己找了很多资料,搞了蛮长时间的,所以记录一下 ...

  8. SpringCloud入门总结 + 使用SpringCloud搭建微服务项目

    SpringCloud 1.认识微服务 2.认识spring Cloud 3.Spring Cloud Eureka 服务发现框架 3.1认识Eureka 3.2 实战--开发并部署Eureka Se ...

  9. 原生K8S部署pig微服务项目

    原生K8S部署pig微服务项目 简介 项目地址:码云 Pig微服务项目 基于 Spring Cloud 2021 .Spring Boot 2.7. OAuth2 的 RBAC 权限管理系统 基于数据 ...

最新文章

  1. AVAssetExportSession导出MP4视频失败
  2. 论文笔记:Inception v1
  3. [系统底层] x86和x64下ssdt的差异
  4. 关于tcmalloc\malloc和new
  5. idea ssm打war包_IDEA下从零开始搭建SpringBoot工程
  6. 此计算机无法设置密码,电脑该怎么设置密码
  7. # RSA 公钥加密算法
  8. 代写python作业费用标准_代做159.272作业、代写Programming Paradigms作业、代做Python实验作业、代写Java/c++编程作业代写Database|代做R...
  9. 使用layui 做后台管理界面,在Tab中的链接点击后添加一个新TAB的解决方法
  10. PHP获取对象的hashcode_php 字符串转hashcode(包括中文)
  11. 手把手教你强化学习(十) 基于Stochastic Policy的深度强化学习方法
  12. asp.net dev xtraReporting(一)静态页面
  13. kubernetes视频教程笔记 (17)-Job和CronJob
  14. ffmpeg完全教程
  15. win10通过pe修复uefi引导后bootmanage出现两个引导项很烦人
  16. MySQL临时表的使用
  17. 地图学:专题地图制作详细步骤
  18. 选择服务器系统,服务器系统选择
  19. 统一社会信用代码正则表达式
  20. 【mysql】 踩坑记录之derived(派生表)

热门文章

  1. 上海科技大学计算机科学与技术录取分数线,这所新高水平大学2021年全国招生300人,设5个专业!分数线如何?...
  2. canvas绘制坐标系
  3. 社招面经: 联易融Java开发一面2021.04.15
  4. 给yml配置文件的密码加密(SpringBoot)
  5. 退役选手的自我修养——《千与千寻》杂感
  6. kaldi基础介绍(一)在说话人识别中的数据准备
  7. PCV安装+报错解决
  8. 关闭钉钉的开机自启动
  9. ACS724LLCTR-30AB-T优点
  10. 中国5G无人仓现场运行披露