python调用activateMQ进行数据传输
网上的很多代码都不适用了,同时存在一个问题就是基于stomp协议的activateMQ的接收端,存在一个问题,就是接收到的数据队列是通过回调函数(类)进行更新的,网上大多数都是这种方面,但是这样做对后面的数据处理很不友好,那么怎么处理呢?
其实很简单自己去看官方的代码和例子即可,都是最新更新的
http://jasonrbriggs.github.io/stomp.py/index.html这个是教程
https://github.com/jasonrbriggs/stomp.py这个是源码
源码中的tests文件夹里又大量的事例如:
例如 activateMQ的代码:
import stomp
from stomp.listener import TestListener
from .testutils import *@pytest.fixture()
def conn():conn = stomp.Connection11(get_default_host())conn.set_listener("testlistener", TestListener("123", print_to_log=True))conn.connect(get_default_user(), get_default_password(), wait=True)yield connconn.disconnect(receipt=None)class TestActiveMQ(object):def test_send_to_activemq(self, conn):conn.subscribe(destination="/queue/test", id=1, ack="auto")conn.send(body="this is a test", destination="/queue/test", content_type="text/blah", receipt="123")validate_send(conn)logging.info(conn.get_listener("testlistener").get_latest_message())
在数据传输的时候存在一个问题,就是java那么无法接受我这里发送的字符串,接受的是二进制类型的数据,这是方面怎么解决呢?(网上有一篇教程很好,直接拿过来)
python发送消息到activeMQ后java接收到BinaryMessage的坑
和另一个系统进行对接,使用activemq进行消息通信。对方使用java客户端监听一个topic,我们需要发送TextMessage消息,对方接收后进行处理。而我们因为系统架构的原因只能使用python进行推送,也就只能通过stomp协议发送消息。然后就遇到了问题,发送的消息在java消费者端只能解析成BinaryMessage,而发送的时候根本没有办法指定消息类型。网上搜了很久没有找到相同的情况。
根据官方通过python往ActiveMQ发送message的demo编写如下代码。
# -*-coding:utf-8-*-
import stomp
import timequeue_name = '/queue/SampleQueue'
topic_name = '/topic/SampleTopic'
listener_name = 'SampleListener'
test_name = "springBootMqQueue"
springBootMqQueue = '/queue/springBootMqQueue'class SampleListener(object):def on_message(self, headers, message):print('headers: %s' % headers)print('message: %s' % message)# 推送到队列queue
def send_to_queue(msg):conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)conn.connect('admin', 'admin', wait=True)conn.send(springBootMqQueue, msg)conn.disconnect()##从队列接收消息
def receive_from_queue():conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)conn.set_listener(listener_name, SampleListener())conn.connect('admin', 'admin', wait=True)conn.subscribe(springBootMqQueue)time.sleep(1) # secsconn.disconnect()if __name__ == '__main__':send_to_queue('{"content":{"flow":{"network":"5","times":"1-1","url":"http://www.baidu.com","way":"5"},"sms":{"direction":"0","text":"短信内容详情"},"voice":{"connect":"5","key":"挂断"}},"form":"13901295021","formPort":"com4","interval":"2-2","network":"5","taskId":"1dsf3641212434g","times":"1-3","to":"18611010269","type":"1"}')receive_from_queue()
Stomp是一个很简单的协议,协议中不携带TextMessage和BytesMessage相关的信息,而是通过content-length header判断消息类型的。header中有content-length则说明是BytesMessage,否则是TextMessage。
接下来的问题就简单了,发送的时候不在header中携带content-length就可以了,查看send方法的源码发现
def __init__(self, transport, auto_content_length=True):self.transport = transportself.auto_content_length = auto_content_lengthtransport.set_listener('protocol-listener', self)self.version = '1.0'def send(self, destination, body, content_type=None, headers=None, **keyword_headers):"""Send a message to a destination.:param str destination: the destination of the message (e.g. queue or topic name):param body: the content of the message:param str content_type: the content type of the message:param dict headers: a map of any additional headers the broker requires:param keyword_headers: any additional headers the broker requires"""assert destination is not None, "'destination' is required"assert body is not None, "'body' is required"headers = utils.merge_headers([headers, keyword_headers])headers[HDR_DESTINATION] = destinationif content_type:headers[HDR_CONTENT_TYPE] = content_typebody = encode(body)if self.auto_content_length and body and HDR_CONTENT_LENGTH not in headers:headers[HDR_CONTENT_LENGTH] = len(body)self.send_frame(CMD_SEND, headers, body)
三个条件都为true则会填充content-length,而auto_content_length是在__init__方法中传入的,默认值为True,所以只需要在创建对象的时候将该值设置为False即可。
# 推送到队列queue
def send_to_queue(msg):conn = stomp.Connection10([('192.168.36.213', 61613)], auto_content_length=False)conn.connect('admin', 'admin', wait=True)conn.send(springBootMqQueue, msg)conn.disconnect()
python调用activateMQ进行数据传输相关推荐
- python调用动态链接库传送protobuf数据。
什么是protobuf protobuf是Google提供的一个开源序列化框架,类似于XML,JSON这样的数据表示语言,其最大的特点是基于二进制,因此比传统的XML表示高效短小得多.开发者定义类似于 ...
- python编程(python调用dll程序)
[ 声明:版权所有,欢迎转载,请勿用于商业用途. 联系信箱:feixiaoxing @163.com] 很多人说python的效率比较低,主要是没有分清什么时候用python.什么时候用c.对于网络. ...
- python有道翻译接口-Python调用有道翻译api实现翻译
通过调用有道翻译的api,实现中译英.其他语言译中文 代码: # coding=utf-8 import urllib import urllib2 import json import time i ...
- python 图表_Python入门学习系列——使用Python调用Web API实现图表统计
使用Python调用Web API实现图表统计 Web API:Web应用编程接口,用于URL请求特定信息的程序交互,请求的数据大多以非常易于处理的格式返回,比如JSON或CSV等. 本文将使用Pyt ...
- python调用cmd命令会弹出黑框_python 调用cmd,不显示cmd黑框
python 调用系统命令的方式有很多 1.1 os.system(command) 在一个子shell中运行command命令,并返回command命令执行完毕后的退出状态.这实际上是使用C标准 ...
- python调用js库中的函数_Python 调用JS文件中的函数
Python 调用JS文件中的函数 1.安装PyExecJS第三方库 2.导入库:import execjs 3.调用JS文件中的方法 Passwd = execjs.compile(open(r&q ...
- python 调用 javascript函数
python 调用 javascript函数 # pip install pyexecjs import execjs # 直接执行 print('execjs.eval:', execjs.eval ...
- python调用dll报错:ValueError: Procedure called with not enough arguments (4 bytes missing) or wrong call
python调用dll报错:ValueError: Procedure called with not enough arguments (4 bytes missing) or wrong call ...
- Windows使用MSVC,命令行编译,链接64位dll,Python调用
文章目录 代码 编译 链接 Python调用 前一篇博客: Windows下使用Visual Studio自带的MSVC,命令行编译C/C++程序 代码 mylib.h代码如下: #ifndef MY ...
最新文章
- java添加窗体_添加的窗体
- 项目开发中使用IDEA创建多个maven子模块
- 【自动驾驶】16.计算机视觉:相机成像原理:世界坐标系、相机坐标系、图像坐标系、像素坐标系之间的转换
- Flex 常见问题解答(from MM)
- 不得不学的http协议
- Windows Server_2008下搭建个人下载服务器(FTP)
- C#和NET Framework的定义
- python wav转pcm
- mysql 两表关联查询 group by having
- Mac系统接移动硬盘进行读写软件Mounty
- 网络数据采集分析工具tcpdump定义抓包过滤器
- 管理的本质是协调还是决策?看看孙权是怎么做到的。
- stm32 + ESP8266 wifi获取网络时间和天气 stm32天气预报
- SpringBoot整合Mybatis(配置文件)
- JAVA扫雷小游戏(待改进)
- APP超级签名分发系统 企业签名免签封装微信多开自助分发多合一系统
- Exception UserExistException is not compatible with throws clause in UserService.findUserByName(Stri
- Python 资源大全中文版
- 抛硬币的两种思维方式
- rap2安装以及启动
热门文章
- Visual Studio 2012正式版官方下载地址
- Visual Studio中工具--》选项--》源代码管理器--》插件选择说明
- Q95:纹理映射(Texture Mapping)(2)——圆柱面
- 数据挖掘-二手车价格预测 Task02:数据分析
- 大数据系统架构的通用模块有哪些
- workbench求解闭合状态尼龙槽环
- python爬取天气数据的header_[python爬虫]爬取天气网全国所有县市的天气数据
- 华南农业大学c语言上机实验答案,华南农业大学C语言上机实验答案.doc
- mysql5.7.22并行回放_技术分享 | 从库 MTS 多线程并行回放(二)
- python3 socket 接收 bytes 长度 会变,Python解析Socket数据流异常bytes的问题(详细)...