原文转载自「刘悦的技术博客」https://v3u.cn/a_id_163

首先得明确一点,和Django一样,在2020年Flask 1.1.1以后的版本都不需要所谓的三方库支持,即Flask-Celery或者Flask-Celery-Help这些库,直接使用Celery原生库即可。

一般情况下,Celery被用来处理耗时任务,比如千篇一律的发邮件或者文件上传之类,本次使用Celery实时或者定时发送基于Websocket的消息队列,因为如果前端已经摒弃老旧的轮询策略,使用Websocket,后端则需要相应的配合Celery进行对持久化的Websocket链接主动推送消息,这种场景在生产环境中还是很常见的,但是网上却鲜有文章阐述,而Celery官方对此的说明是:

If using multiple processes, a message queue service is used by the processes to coordinate operations such as broadcasting. The supported queues are Redis, RabbitMQ, and any other message queues supported by the Kombu package

大体上的意思是:因为 Celery 和 前端Web 是分开的 Process 所以需要有一个共同的后端来触发消息的推送,这是一个能否用Celery触发Websocket消息推送的重点。

第一步,安装必须的库

pip3 install flask-cors
pip3 install flask-socketio
pip3 install celery

flask-cors库是用来规避浏览器同源策略的库,flask-socketio用来建立全双工websocket链接,celery承担异步任务队列的职责。

实例化app对象

from flask_cors import CORS
from flask_socketio import SocketIO,send,emit,join_room, leave_room
import urllib.parse
from celery import Celery
from datetime import timedelta  app = Flask(__name__)  app.config['BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.config['CELERY_ACCEPT_CONTENT'] = ['json', 'pickle']
app.config['REDIS_URL'] = 'redis://localhost:6379'

这里消息队列容器还是使用redis

随后利用初始化的app队列,初始化socket对象,这样才能让基于wsgi的Flask支持websocket

socketio = SocketIO(app,cors_allowed_origins='*',async_mode="threading",message_queue=app.config['CELERY_RESULT_BACKEND'])

这里注意下,加上跨域参数,并且指定异步模式为线程。

第三步,就是初始化celery对象

celery = Celery(app.name)
celery.conf.update(app.config)

之后就可以声明一些必要的方法和视图,并且运行实例

@celery.task()
def get_sendback():  socketio.emit('sendback','message',broadcast=True)  @app.route('/task')
def start_background_task():  get_sendback.delay()  return '开始'  @socketio.on('join')
def on_join(data):  username = 'user1'  room = 'room1'  join_room(room)  send(username + ' has entered the room.', room=room)  @socketio.on('message')
def handle_message(message):  message = urllib.parse.unquote(message)  print(message)  send(message,broadcast=True)  @socketio.on('connect', namespace='/chat')
def test_connect():  emit('my response', {'data': 'Connected'})  @socketio.on('disconnect', namespace='/chat')
def test_disconnect():  print('Client disconnected')  @app.route("/sendback",methods=['GET'])
def sendback():  socketio.emit('sendback','message')  return 'ok'  if __name__ == '__main__':  app.config['JSON_AS_ASCII'] = False  socketio.run(app,debug=True,host="0.0.0.0",port=5000)

可以看到异步调用任务使用@celery.task()来声明,而基于websocket的视图则用@socketio.on来声明,在Flask项目的目录下,分别开启两个命令行,启动Web服务和Celery服务

python manage.py

启动celery服务

celery worker -A manage.celery --loglevel=info -P eventlet

这里celery服务还是基于协程库eventlet

前端使用市面上比较流行的Vue.js,需要安装http://socket.io的支持

npm install vue-socket.io@2.1.0

编写一个用来测试的组件client.vue

<template>  <div>  <div v-for="item in log_list"  >  {{item}}  </div>  <input v-model="msg" />  <button @click="send">发送消息</button>  </div>  </template>  <script>  export default {  data () {  return {  msg: "",  log_list:[]  }  },  //注册组件标签  components:{  },  sockets:{  connect: function(){  console.log('socket 连接成功')  },  message: function(val){  console.log('返回:'+val);  alert(val);  this.log_list.push(val);  },
sendback: function(val){  console.log('返回:'+val);  alert(val);  }
},  mounted:function(){  },  methods:{  send(){  this.$socket.emit('join',encodeURI("加入房间"))  this.$socket.emit('message',encodeURI("用户:"+this.msg));  },  }
}  </script>  <style>  </style>

通过监听和后端相同的键“sendback”来展示后台推送的消息。

测试一下异步推送

访问url触发异步任务:http://localhost:5000/sendback

前端立刻受到了后端异步推送的消息。

下面我们来测试一下定时任务,基于Celery的Crontab好处就是支持秒级定时,在上面celery初始化之后,就可以通过配置的方式定义定时任务

celery = Celery(app.name)
celery.conf.update(app.config)  celery.conf.CELERYBEAT_SCHEDULE = {  "test":{  "task":"get_cron",  "schedule":timedelta(seconds=10)  }  }

这里我们增加一个测试任务,定时每10秒推送一条消息

@celery.task(name="get_cron")
def get_cron():  get_sendback.delay()

直接异步调用刚刚写好的推送方法即可,这样就可以和前端共用一个后端websocket链接,否则定时任务就无法触发消息推送。

同一目录下启动第三个服务,注意web服务和异步服务不要停

celery -A manage.celery beat --loglevel=debug

可以看到定时推送websocket消息也实现了。

这个功能本质上就是一个应用层面的解耦,用Celery特有的task方式来基于websocket推送emit消息,二者相辅相成。

最后奉上这个demo的版本库:https://gitee.com/QiHanXiBei/myflask

原文转载自「刘悦的技术博客」 https://v3u.cn/a_id_163

flask websocket json_Win10环境下使用Flask配合Celery异步推送实时/定时消息/2020年最新攻略...相关推荐

  1. Win10系统下安装编辑器之神(The God of Editor)Vim并且构建Python生态开发环境(2020年最新攻略)

    众神殿内,依次坐着Editplus.Atom.Sublime.Vscode.JetBrains家族.Comodo等等一众编辑器界的大佬们,偌大的殿堂内几无立锥之地,然而在殿内的金漆雕龙宝座上,端坐着一 ...

  2. vim配置python开发环境_Win10系统下安装编辑器之神(The God of Editor)Vim并且构建Python生态开发环境(2020年最新攻略)...

    众神殿内,依次坐着Editplus.Atom.Sublime.Vscode.JetBrains家族.Comodo等等一众编辑器界的大佬们,偌大的殿堂内几无立锥之地,然而在殿内的金漆雕龙宝座上,端坐着一 ...

  3. Unity环境下实现Camera高帧率RTMP推送

    Unity下RTMP直播背景方面不再赘述,今天主要讨论的是,Unity环境下,如何实现Camera高帧率RTMP推送,这里提到的高帧率,不再局限于常规环境下的30帧,以VR头显为例,更高的帧率(比如5 ...

  4. Python语言学习:创建/删除文件/文件夹、获取当前文件/文件夹路径(系统环境路径/目录)、获取当前文件夹下的所有子文件路径等代码(os系列用法)实现之详细攻略

    Python语言学习:创建/删除文件/文件夹.获取当前文件/文件夹路径(系统环境路径/目录).获取当前文件夹下的所有子文件路径等代码(os系列用法)实现之详细攻略 目录 系统环境路径的设置 1.sys ...

  5. Python:Python多种集成开发环境(IDE,编译器)的简介、安装、入门、使用方法之详细攻略

    Python:Python多种集成开发环境(IDE,编译器)的简介.安装.入门.使用方法之详细攻略 目录 Python多种编译器的简介.安装.入门.使用方法 1.Anaconda 2.ipython ...

  6. Windows系统下使用protobuf:protobuf的简介、安装、使用方法之详细攻略

    Windows系统下使用protobuf:protobuf的简介.安装.使用方法之详细攻略 目录 protobuf的简介 protobuf的安装 protobuf的使用方法 protobuf的简介 P ...

  7. Ubuntu:Ubuntu下安装Anaconda和Tensorflow的简介、入门、安装流程之详细攻略

    Ubuntu:Ubuntu下安装Anaconda和Tensorflow的简介.入门.安装流程之详细攻略 目录 安装流程 1.安装nvidia显卡驱动 2.安装cuda8 3.安装Cudnn 4.Ana ...

  8. flask keras 多线程环境下加载模型

    keras 多线程环境下加载模型 Tensor Tensor is not an element of this graph. 问题场景 keras 使用flask 发布深度学习模型服务,模型有一个定 ...

  9. window环境下创建Flask项目需要安装常见模块命令

    安装Flask环境 pip install flask==0.10.1 使用命令行操作 pip install flask-script 创建表单 pip install flask-wtf 操作数据 ...

最新文章

  1. CISCO交换机配置命令大全
  2. 吐血,经过4个小时,终于发现这个可以解决虚拟机ubuntu不能联网的问题
  3. java的调试工具_2020年最佳Java调试工具(翻译)
  4. JUC队列-ArrayBlockingQueue(一)
  5. 在win7环境下使用网络无线共享把电脑变成一台无线路由器
  6. 深入并发包-ConcurrentHashMap
  7. Java实现redis管道
  8. python excel 填充颜色_“利用python将图填充到excel案例”
  9. 配电室配套设施轨道巡检机器人及辅助监控系统
  10. 架构设计 例子和实践 系统设计说明书
  11. c语言小学期大作业学生管理系统,小学期完成
  12. leetcode题目-最小栈和用两个栈实现队列
  13. GOOGLE 手机定位厘米挑战赛选手提到的技巧、方法总结
  14. 如何运行app和exe程序
  15. 结构方程模型的R语言实现
  16. 河南省高中计算机会考难不难,关于河南省的高中会考我想说。。。
  17. 最小树形图 之 朱刘算法【模板】
  18. 《秘密》卷一:秘密-吸引力法则
  19. tl494c封装区别_TL494的特点与引脚功能
  20. 麒麟桌面系统安全中心介绍

热门文章

  1. 【Docker】Mac下Docker启动Kubernetes处于一直启动中(卡死)
  2. 05-Java通过Executors提供四种线程池
  3. 二叉树的递归遍历算法c语言 数据结构,递归创建二叉树c语言实现+详细解释
  4. 还在用Swagger?我推荐这款零代码侵入的接口管理神器!
  5. (很全面)SpringBoot 使用 Caffeine 本地缓存
  6. Java-重定向输出流实现程序日志
  7. 【ECharts 置图表同序列不同数据点的独立颜色值】
  8. Hibernate的执行流程——SessionFactory的创建
  9. Servlet入门1
  10. web应用程序servlet的映射名称的规则及请求过程