目录

1 背景需求

2 技术方案

2.1 消息队列

2.2 进程守护

3 源码介绍

3.1 supervisor部分

3.1.1 supervisord.conf 内容

3.1.2 MM3D.conf 和 MM3D_2.conf 内容

3.2 算法程序(也就是我的主函数)


1 背景需求

某C端产品,前端嵌入式(安卓)将采集的数据发送给后端,后端服务器(Java)要负责将数据交到算法服务器(python,C++),算法服务器收到数据并处理完后将结果再返回给后端,后端拿着结果二次加工后再发给前端显示。

基本要求:

  1. 算法服务器有多台,要充分利用,要满足并发。
  2. 异常崩溃、关机等,算法要自启动。

2 技术方案

  1. 应对并发问题:后端和算法端采用RabbitMQ消息订阅方案。
  2. 应对异常自启动问题:采用Supervisor进程守护。

架构图如下:

2.1 消息队列

消息队列(Message queue)原理比较简单(当然细节很多),主要作用就是把所有生产者的数据放到一个队列中,所有消费者从从这个队列里取,确保每个数据仅被消费一次,互相不冲突。

详细原理可参考:

消息队列(mq)是什么? - 知乎

什么是消息队列啊? - 知乎

RabbitMQ 入门系列(9)— Python 的 pika 库常用函数及参数说明_wohu1104的专栏-CSDN博客

2.2 进程守护

进程守护的目的是让异常崩溃的程序能自动重启。

Supervisor是用Python开发的一套通用的进程管理程序,能将一个普通的命令行进程变为后台daemon,并监控进程状态,异常退出时能自动重启。

几个要点的解释:

  1. Supervisor为什么能启动程序?

    1. 答:Supervisor自己本身是某种程序,它能在Linux系统,通过自定义的配置去指定任意个子程序(每个子程序要定义一个唯一名称),而每个子进程被启动后会去执行一个shell文件(.sh文件),而你可以在这个shell文件中自定义任何命令行代码,所以你能以任何方式去启动任意位置的任意多个程序。
  2. Supervisor为什么能自动启动崩溃的程序?
    1. 答:由于supervisor的子进程会通过指定的shell脚本去启动其他“孙”进程(也就是你想启动的程序),并且子进程能和孙进程通信,所以,当你的程序崩溃时,其所属的supervisor子进程会重新执行一次shell脚本,把这个崩溃的程序再启动。这里重启的规则和配置有很多方式,很灵活。

更多信息,我看了比较好的参考如下(推荐级分先后顺序):

​​​​​​Supervisor使用详解 - 浪淘沙& - 博客园

详解Supervisor进程守护监控 - 请叫我头头哥 - 博客园

supervisor 使用详解_11111-CSDN博客_supervisor

3 源码介绍

算法服务器部分运行的逻辑是:

  1. 算法服务器开机。
  2. supervisor程序自动启动,通过配置文件,自动开启相应的子进程。每个子进程启动后再去调用一个shell文件,把算法程序逐一启动起来。
  3. 众多算法程序开始实时订阅唯一的文件服务器消息。
  4. 某个算法程序从MQ队列中拿到一个文件包路径和名字后,就会通过FTP去文件服务器下载数据到算法服务器本地,然后算法模块开始处理数据、返回数据给后端,然后重新监听。

3.1 supervisor部分

supervisor安装好后,配置文件一般放在/etc/supervisor文件夹内,里面有如下两个文件:

  • supervisord.conf:supervisor的基本配置文件
  • conf.d:一个文件夹,里面存放supervisor每个子进程的配置文件。(我有个疑问是为什么一个文件夹要用.d起名字,看起来还以为是个文件)
    • MM3D.conf:我定义的一个子进程配置。这个conf文件的名字可以随便取。
    • MM3D_2.conf:我定义的第二个子进程配置。

3.1.1 supervisord.conf 内容

; supervisor config file[unix_http_server]
file=/var/run/supervisor.sock   ; (the path to the socket file)
chmod=0700                       ; sockef file mode (default 0700)[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor            ; ('AUTO' child log dir, default $TEMP); the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket; The [include] section can just contain the "files" setting.  This
; setting can list multiple files (separated by whitespace or
; newlines).  It can also contain wildcards.  The filenames are
; interpreted as relative to this file.  Included files *cannot*
; include files themselves.[include]
files = /etc/supervisor/conf.d/*.conf

3.1.2 MM3D.conf 和 MM3D_2.conf 内容

MM3D.conf内容如下:

[program:MM3D]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_out.log

MM3D_2.conf内容如下:

[program:MM3D_2]
directory=/mm3d/RV2_magic_mirror_algo/MM3D
command=sh /mm3d/RV2_magic_mirror_algo/MM3D/excute.sh
autostart=true
autorestart=true
startretries=100
redirect_stderr=true
stdout_logfile=/mm3d/RV2_magic_mirror_algo/MM3D/supervisor_2out.log

注意:conf文件中,比较重要的参数感觉有两个:

  1. 唯一的进程名。也就是:[program:XXX]里面的XXX。后面使用supervisorctl 各种命令操控子进程需要用到这些名字。
  2. 生成日志的位置和名字。多个子进程不要把日志搞一起了。

3.2 算法程序(也就是我的主函数)

算法程序主要包括两块:

  1. MQ通信模块(包括FTP拉取数据流)。
  2. 算法处理模块以及数据上传模块。

代码如下:

(config_MQ.py就省略了,里面是一些SDK、模型等地址,以及MQ的IP地址和密码等)

import os
import numpy as npfrom pathlib import Path
from config_MQ import Config
import time
from loguru import logger
import sysimport pika
from ftplib import FTP
import jsondef ftp_connect():try:"""连接ftp:return:"""ftp = FTP()logger.debug('config.ftp_host: {}', config.ftp_host)logger.debug('config.ftp_port: {}', config.ftp_port)ftp.connect(config.ftp_host, config.ftp_port)  # 连接远程服务器IP地址ftp.encoding = 'utf-8'ftp.set_debuglevel(1)  # 不开启调试模式ftp.login(config.ftp_user, config.ftp_pwd)  # 登录ftp# print(ftp.getwelcome()) # ftp服务器欢迎语except Exception as e:#print(e)logger.exception('ftp_connect error: {}', e)return Noneelse:return ftpdef read_file(file_path, target_dir, filename):ftp = ftp_connect()  # 连接ftp# ftp服务器上文件的路径# 本地文件下载保存的路径# 本地文件下载写入的路径文件# writefile = '%s/%s' % (write_path, filename)write_path = target_dir + '/%s' % (filename + '.zip')with open(write_path, "wb") as f:ftp.retrbinary('RETR %s' % file_path, f.write)ftp.close();def callbackTry(ch, method, properties, body):print(body.decode())ch.basic_ack(delivery_tag=method.delivery_tag)## 拿到消息转jsonbodyJson = json.loads(body.decode())filepath = bodyJson['filepath']user_id = bodyJson['keypair']callback_url = bodyJson['callbackUrl'] # 回调云端地址sample_raw_dir = os.path.join(raw_data_root, user_id) #../../MM3D_RAW/B16XXXXXXXXsample_result_dir = os.path.join(result_data_root, user_id) # ../../MM3D_Result/B16XXXXXXX# 拿到ftp url下载文件并保存sample_raw_dirif not os.path.isdir(sample_raw_dir):try:os.mkdir(sample_raw_dir)except Exception as e:logger.exception('Fail to mkdir to raw data: {}', e)#print('Fail to mkdir to raw data', e)if not os.path.isdir(sample_result_dir):try:os.mkdir(sample_result_dir)except Exception as e:logger.exception('Fail to mkdir to result data: {}', e)#print('Fail to mkdir to result data', e)try:# zip_file = user_id + '.zip'# file.save(os.path.join(sample_raw_dir, zip_file))read_file(filepath, sample_raw_dir, user_id) #通过FTP拉取数据包并保存在本地except Exception as e:logger.exception('Fail to save raw data: {}', e)#print("Fail to save raw data", e)start_time = int(round(time.time() * 1000))sample_key_pair = sample_raw_dir.split('/')[-1]# 识别文件的路劲logger.debug("sample_raw_dir :{}",  sample_raw_dir)logger.debug("callback_url :{}",  callback_url)############################ 算法部分 ############################## TODO 调用算法程序识别def callback(ch, method, properties, body):try:callbackTry(ch, method, properties, body)except Exception as e:logger.exception('algo error: {}', e)#print("algo error:", e)def init_rabbitmq():# 创建连接时的登录凭证。 username: MQ 账号, password: MQ 密码credentials = pika.PlainCredentials(config.rabbitmq_user, config.rabbitmq_pwd)# 阻塞式连接 MQ# parameters: 连接参数(包含主机/端口/虚拟主机/账号/密码等凭证信息)connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.rabbitmq_host, port=config.rabbitmq_port, virtual_host='/',credentials=credentials))# 获取与 rabbitmq 通信的通道channel = connection.channel()# 声明交换器exchange = "algoExchange"channel.exchange_declare(exchange=exchange, durable=True, exchange_type='topic')# 声明队列ex_queue = "algoExchange_queue"channel.queue_declare(queue=ex_queue, durable=True, auto_delete=True)# 通过路由键将队列和交换器绑定channel.queue_bind(exchange=exchange, queue=ex_queue, routing_key='algoConfigRoutingKey')# 从队列中拿到消息开始消费#(当要消费时,调用该回调函数 callback)channel.basic_consume(ex_queue, callback,auto_ack=True)  # auto_ack设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉# 处理 I/O 事件和 basic_consume 的回调, 直到所有的消费者被取消# (开始循环,直到发送退出消息)channel.start_consuming()if __name__ == "__main__":'''configue logger rotation="00:00:00",'''logger.add('../log/log-{time:YYYY-MM-DD}-PID='+ str(os.getpid()) +'.log', level="DEBUG",encoding="utf-8",  colorize=True, format="<green>{time}</green> <level>{message}</level>" )config = Config()raw_data_root = config.raw_data_rootresult_data_root = config.result_data_rootraw_data_backup_root = config.raw_data_backupraw_data_backup_root_path = Path(raw_data_backup_root)if not raw_data_backup_root_path.is_dir():os.mkdir(config.raw_data_backup)############################ 算法初始化部分 ############################## TODO 调用初始化############################ rabbitmq部分 ############################init_rabbitmq()

【部署类】专题:消息队列MQ、进程守护Supervisor相关推荐

  1. 消息队列 MQ 入门理解

    转载于阿里官方文档,把一些基础部分提炼整理了一下 功能特性: 应用场景: 消息队列 MQ 可应用于如下几个场景: 分布式事务 在传统的事务处理中,多个系统之间的交互耦合到一个事务中,响应时间长,影响系 ...

  2. 消息队列MQ 之 Kafka

    目录 前言 一.消息队列 MQ 为什么需要消息队列(MQ) 使用消息队列的好处 消息队列的两种模式 二.Kafka 概述 Kafka 简介 Kafka 的特性 三 实验 前言 一.消息队列 MQ MQ ...

  3. linux mq清空消息队列,MQ消息队列搭建命令及方法

    MQ 是一款稳定.安全又可靠的消息传递中间件.它使用消息和队列来支持应用程序.系统.服务和文件之间的信息交换.它可以简化和加速多个平台中不同应用程序和业务数据的集成.支持各种 API 和语言,并可以在 ...

  4. java队列_RPC远程调用和消息队列MQ的区别

    RPC和MQ同样都是用于分布式系统的两个很重要的技术,都有服务提供者.消费者的概念,可在一定程度上对系统进行解耦.但两者之间还是有区别的,本篇简单介绍~ 一.RPC RPC(Remote Proced ...

  5. 消息队列的使用场景_消息队列MQ的特点、选型及应用场景

    一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...

  6. 后端技术:消息队列MQ/JMS/Kafka相关知识介绍

    ?今天给大家分享消息队列MQ/JMS/Kafka相关知识介绍 1.消息队列介绍 首先举个收快递的栗子,传统的收快递,快递小哥把我们的快递送到我们的手里.他需要什么条件嗯? 快递小哥有时间送, 我们有时 ...

  7. 详解RPC远程调用和消息队列MQ的区别

    谈到分布式架构,就不得不谈到分布式架构的基石RPC. 什么是RPC RPC(Remote Procedure Call)远程过程调用,主要解决远程通信间的问题,不需要了解底层网络的通信机制. RPC服 ...

  8. 消息队列MQ夺命连环11问:kafka、rabbitmq、rocketmq、activemq

    <消息队列MQ如何保证消息的幂等性> <RabbitMQ架构> <ZeroMQ简介:一种高性能的异步消息传递库> <Rocketmq原理&最佳实践&g ...

  9. 消息队列MQ与微消息队列MQTT

    文章目录 参考文章 什么是消息队列,什么是RPC 为什么要使用MQ消息队列 1. 解耦(可用性) 2. 流量削峰 3. 数据分发 消息队列的缺点 多种主流传统消息队列MQ对比 传统消息队列Rocket ...

  10. 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解

    前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...

最新文章

  1. SQL Server Management Studio清除历史登陆记录
  2. [机器学习笔记]Note11--聚类
  3. checkbox wpf 改变框的大小_WPF样式取决于复选框状态
  4. 前端----表格的具体使用(jquery)
  5. 双机高可用、负载均衡、MySQL (读写分离、主从自动切换)架构设计
  6. 网络拓扑图自动生成_SAP ABAP关键字语法图和ABAP代码自动生成工具Code Composer
  7. 电信光猫华硕路由器端口转发
  8. 小学计算机组成的说课PPT,小学说课课件.ppt
  9. 24h的编程比赛总结
  10. matlab生成指数分布,如何在matlab中生成负指数分布和均匀分布的随机数
  11. 织梦DedeCMS从入门到精通建站视频教程全集
  12. 22考研初试410数一145上岸上海交通大学819经验分享
  13. 抖音研发效率负责人:抖音能做到每周迭代,离不开飞书项目
  14. 邮箱大师手机版服务器异常,网易邮箱大师无法收信怎么办 网易邮箱大师无法收信的解决办法...
  15. Leetcode-数据结构-53.最大子数组和
  16. linux中|管道符的作用
  17. Wifi无法自动连接的问题
  18. Android录音下————AudioRecord源码分析
  19. 【网站支付PHP篇】thinkPHP集成汇潮支付(ecpss)
  20. USB接口无法识别设备

热门文章

  1. python 字符串格式化是打印不同类型更简单一些
  2. Java设计模式:观察者模式
  3. The form contains the following errors
  4. TMS Scripter importtool的使用
  5. ASP.NET MVC Identity 兩個多個連接字符串問題解決一例
  6. 2022-2028年中国摩托车行业投资分析及前景预测报告(全卷)
  7. Android消息机制学习笔记
  8. Hive Metastore 连接报错
  9. 2014 Super Training #8 C An Easy Game --DP
  10. ASP.NET中 RequiredFieldValidator(非空验证)的使用