python用stomp扩展连接activemq(千千万万要注意,默认端口是61613,不是62613)
一、下载activemq
http://archive.apache.org/dist/activemq/5.10.0/
二、安装启动
http://activemq.apache.org/version-5-getting-started.html
activemq依赖java环境,如果没有请先安装
yum -y install java-1.8.0-openjdk
bin/activemq start
bin/activemq stop
三、访问管理界面
http://192.168.243.101:8161/admin/
admin/admin
四、安装stomp
我用的py2.7版本,所以只能装4.1.23
https://pypi.org/project/stomp.py/4.1.23/
五、编写代码
官方文档地址
https://stomppy.readthedocs.io/en/latest/quickstart.html
https://stomppy.readthedocs.io/en/latest/api.html
千千万万要注意,默认端口是61613,不是62613
官方文档写的不规范,针对非常坑人。加上地址参数就连接不上,然后我就尝试telnet,发现根本没有62613。然后就查看所有开放的端口,发现有个61613。
附上查看端口的方法,总是记不得参数
netstat -nupl (UDP类型的端口)
netstat -ntpl (TCP类型的端口)
a 表示所有
n表示不查询dns
t表示tcp协议
u表示udp协议
p表示查询占用的程序
l表示查询正在监听的程序
dir(stomp)
['Connection', 'Connection10', 'Connection11', 'Connection12', 'ConnectionListener', 'PrintingListener', 'StatsListener', 'StompConnection10', 'StompConnection11', 'StompConnection12', 'WaitingListener', '__builtins__', '__doc__', '__file__', '__name__', '__package__', '__path__', '__version__', 'adapter', 'backward', 'backward2', 'backwardsock', 'backwardsock26', 'connect', 'constants', 'exception', 'listener', 'protocol', 'transport', 'utils']
dir(conn)
['_HeartbeatListener__heartbeat_loop', '_HeartbeatListener__update_heartbeat', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_escape_headers', 'abort', 'ack', 'auto_content_length', 'begin', 'commit', 'connect', 'disconnect', 'get_listener', 'get_ssl', 'heart_beat_receive_scale', 'heartbeat_terminate_event', 'heartbeat_thread', 'heartbeats', 'is_connected', 'nack', 'next_outbound_heartbeat', 'on_before_message', 'on_connected', 'on_connecting', 'on_disconnected', 'on_error', 'on_heartbeat', 'on_heartbeat_timeout', 'on_message', 'on_receipt', 'on_receiver_loop_completed', 'on_send', 'received_heartbeat', 'remove_listener', 'running', 'send', 'send_frame', 'set_listener', 'set_receipt', 'set_ssl', 'start', 'stop', 'subscribe', 'transport', 'unsubscribe', 'version']
stomp.Connection 调用的是StompConnection11类,走的1.1协议
stomp.Connection10 走的就是1.0协议
stomp.Connection12 走的就是1.2协议
这里重点介绍StompConnection11这个类,其他的可以自行help。
这个类中有以下方法。
【__init__】
__init__(self, host_and_ports=None, prefer_localhost=True, try_loopback_connect=True, reconnect_sleep_initial=0.1, reconnect_sleep_increase=0.5, reconnect_sleep_jitter=0.1, reconnect_sleep_max=60.0, reconnect_attempts_max=3, use_ssl=False, ssl_key_file=None, ssl_cert_file=None, ssl_ca_certs=None, ssl_cert_validator=None, ssl_version=3, timeout=None, heartbeats=(0, 0), keepalive=None, vhost=None, auto_decode=True, encoding='utf-8', auto_content_length=True, heart_beat_receive_scale=1.5, recv_byte=1024)
host_and_ports:IP和端口组成元祖,支持集群,所以外面就是列表。
形如[('192.168.1.100', 61613), ('192.168.1.101', 61613)]
prefer_localhost:是否首选本地机器,布尔值。True或False。保持默认值即可。
try_loopback_connect:尝试回环连接,布尔值。True或False。保持默认值即可。
reconnect_sleep_initial
reconnect_sleep_increase
reconnect_sleep_jitter
reconnect_sleep_max
reconnect_attempts_max
这5个参数都是重连的,需要的自行研究。无法就是第一次重连,间隔重连增加时间,最大重连次数这些。
use_ssl=False,
ssl_key_file=None,
ssl_cert_file=None,
ssl_ca_certs=None,
ssl_cert_validator=None,
ssl_version=3
ssl参数,这个也不需要解释了吧。
timeout=None,
heartbeats=(0, 0),
心跳间隔。单位为毫秒。这个参数只有在Connection和Connection12有效。元组。
keepalive=None,
vhost=None,
auto_decode=True,
encoding='utf-8',
auto_content_length=True,
编码相关参数。重点关注一下auto_content_length参数,有人踩过坑,见下方链接。
https://blog.csdn.net/qq_28859405/article/details/91382228
heart_beat_receive_scale=1.5,
recv_byte=1024
【connect】
connect(self, *args, **kwargs)
【disconnect】
disconnect(self, receipt=None, headers=None, **keyword_headers)
调用协议断开连接,然后停止传输本身。
:param str receipt: the receipt to use with the disconnect
:param dict headers: a map of any additional headers to send with the disconnection
:param keyword_headers: any additional headers to send with the disconnection
----------------------------------------------------------------------
Methods inherited from BaseConnection:
__enter__(self)
__exit__(self, exc_type, exc_val, exc_tb)
get_listener(self, name)
:param str name:
:rtype: ConnectionListener
get_ssl(self, *args, **kwargs)
is_connected(self)
:rtype: bool
remove_listener(self, name)
:param str name:
set_listener(self, name, lstnr)
:param str name:
:param ConnectionListener lstnr:
set_receipt(self, receipt_id, value)
set_ssl(self, *args, **kwargs)
start(self)
Deprecated. Still included to provide backwards compatibility with previous versions of stomp.py (no longer required)
已弃用。仍然包括在内是为了兼容历史版本(不再需要)
stop(self)
Deprecated. Still included to provide backwards compatibility with previous versions of stomp.py (no longer required)
已弃用。仍然包括在内是为了兼容历史版本(不再需要)
----------------------------------------------------------------------
Data descriptors inherited from stomp.listener.Publisher:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)
----------------------------------------------------------------------
Methods inherited from stomp.protocol.Protocol11:
abort(self, transaction, headers=None, **keyword_headers)
Abort a transaction.
中止一个事务。
:param str transaction: the identifier of the transaction
:param dict headers: a map of any additional headers the broker requires
:param keyword_headers: any additional headers the broker requires
ack(self, id, subscription, transaction=None, receipt=None)
Acknowledge 'consumption' of a message by id.
通过id确认消息的“消费”。
:param str id: identifier of the message
:param str subscription: the subscription this message is associated with
:param str transaction: include the acknowledgement in the specified transaction
begin(self, transaction=None, headers=None, **keyword_headers)
Begin a transaction.
开启一个事务。
:param str transaction: the identifier for the transaction (optional - if not specified
a unique transaction id will be generated)
:param dict headers: a map of any additional headers the broker requires
:param keyword_headers: any additional headers the broker requires
:return: the transaction id
:rtype: str
commit(self, transaction=None, headers=None, **keyword_headers)
Commit a transaction.
提交一个事务。
:param str transaction: the identifier for the transaction
:param dict headers: a map of any additional headers the broker requires
:param keyword_headers: any additional headers the broker requires
nack(self, id, subscription, transaction=None, receipt=None)
Let the server know that a message was not consumed.
让服务器知道消息没有被使用。
:param str id: the unique id of the message to nack
:param str subscription: the subscription this message is associated with
:param str transaction: include this nack in a named transaction
【send】
send(self, destination, body, content_type=None, headers=None, **keyword_headers)
Send a message to a destination in the messaging system (as per https://stomp.github.io/stomp-specification-1.2.html#SEND)
:param str destination: the destination (such as a message queue - for example '/queue/test' - or a message topic)
消息队列名称
:param body: the content of the message
发送的消息内容
:param str content_type: the MIME type of message
内容类型,比如text/plain ; application/json;
:param dict headers: additional headers to send in the message frame
头部信息,注意是字典类型的
:param keyword_headers: any additional headers the broker requires
代理需要的任何附加头。搞不清楚。
send_frame(self, cmd, headers=None, body='')
Encode and send a stomp frame
through the underlying transport:
:param str cmd: the protocol command
:param dict headers: a map of headers to include in the frame
:param body: the content of the message
subscribe(self, destination, id, ack='auto', headers=None, **keyword_headers)
Subscribe to a destination
订阅一个消息队列
:param str destination: the topic or queue to subscribe to
要订阅的主题或队列
:param str id: the identifier to uniquely identify the subscription
订阅标识符以唯一标识订阅的主题或队列
:param str ack: either auto, client or client-individual (see https://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE for more info)
接收到了,自动回复。
:param dict headers: a map of any additional headers to send with the subscription
头部信息,注意是字典类型的
:param keyword_headers: any additional headers to send with the subscription
代理需要的任何附加头。搞不清楚。
unsubscribe(self, id, headers=None, **keyword_headers)
Unsubscribe from a destination by its unique identifier
根据id取消订阅消息队列
:param str id: the unique identifier to unsubscribe from
:param dict headers: additional headers to send with the unsubscribe
:param keyword_headers: any additional headers to send with the subscription
----------------------------------------------------------------------
Methods inherited from stomp.listener.HeartbeatListener:
on_connected(self, headers, body)
Once the connection is established, and 'heart-beat' is found in the headers, we calculate the real
heartbeat numbers (based on what the server sent and what was specified by the client) - if the heartbeats are not 0, we start up the heartbeat loop accordingly.
一旦建立了连接,并且在标题中找到了“心跳”,我们就计算真实值心跳编号(基于服务器发送的内容和客户端指定的内容)-如果心跳不是0,我们相应地启动心跳循环。
:param dict headers: headers in the connection message
:param body: the message body
on_disconnected(self)
on_error(self, *_)
Reset the last received time whenever an error is received.
on_heartbeat(self)
Reset the last received time whenever a heartbeat message is received.
on_message(self, headers, body)
Reset the last received time whenever a message is received.
:param dict headers: headers in the message
:param body: the message content
on_receipt(self, *_)
Reset the last received time whenever a receipt is received.
on_send(self, frame)
Add the heartbeat header to the frame when connecting, and bump
next outbound heartbeat timestamp.
:param Frame frame: the Frame object
----------------------------------------------------------------------
Methods inherited from stomp.listener.ConnectionListener:
on_before_message(self, headers, body)
Called by the STOMP connection before a message is returned to the client app. Returns a tuple
containing the headers and body (so that implementing listeners can pre-process the content).
:param dict headers: the message headers
:param body: the message body
on_connecting(self, host_and_port)
Called by the STOMP connection once a TCP/IP connection to the
STOMP server has been established or re-established. Note that
at this point, no connection has been established on the STOMP
protocol level. For this, you need to invoke the "connect"
method on the connection.
:param (str,int) host_and_port: a tuple containing the host name and port number to which the connection
has been established.
on_heartbeat_timeout(self)
Called by the STOMP connection when a heartbeat message has not been
received beyond the specified period.
on_receiver_loop_completed(self, headers, body)
Called when the connection receiver_loop has finished.
python用stomp扩展连接activemq(千千万万要注意,默认端口是61613,不是62613)相关推荐
- php 通过stomp协议连接ActiveMQ
2019独角兽企业重金招聘Python工程师标准>>> php 通过stomp协议连接ActiveMQ 一.安装php的stomp扩展1 http://pecl.php.net/pa ...
- PHP stomp 连接判断,php实现通过stomp协议连接ActiveMQ操作示例
本文实例讲述了php实现通过stomp协议连接ActiveMQ操作.分享给大家供大家参考,具体如下: 前面介绍了php ActiveMQ的安装与使用,这里再来讲述一下php通过stomp协议连接Act ...
- php stomp rabbitmq,php实现通过stomp协议连接ActiveMQ操作示例
本文实例讲述了php实现通过stomp协议连接ActiveMQ操作.分享给大家供大家参考,具体如下: 前面介绍了php ActiveMQ的安装与使用,这里再来讲述一下php通过stomp协议连接Act ...
- python基础-C扩展
写python的c扩展简介 使用C/C++编写Python模块扩展 Python - 用C扩展编程 使用 C 或 C++ 扩展 Python 原因 添加额外的非python功能. 性能瓶颈的效率提升 ...
- python官方的扩展库索引是什么_python扩展列表
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! python扩展内容阅读本文需要3分钟? ① python中yield关键字的使 ...
- Python局域网socket无法连接的问题解决
Python局域网socket无法连接的问题解决 服务器端的socket监听的是否是服务器的IP地址(并不是什么'localhost'或者'127.0.0.1'这种).即客户端需要连接的IP地址填入即 ...
- Python正则表达式之扩展语法(5)
非捕获组和命名组 精心设计的正则表达式可能会划分很多组,这些组不仅可以匹配相关的子串,还能够对正则表达式本身进行分组和结构化.在复杂的正则表达式中,由于有太多的组,因此通过组的序号来跟踪和使用会变得困 ...
- 写python的c扩展简介
写python的c扩展简介 2012 年 10 月 05 日 isnowfy programGo to comment python是一门非常方便的动态语言,很多你用c或者java要很多行的代码,可能 ...
- python程序文件扩展名有_python程序文件的扩展名称是什么
python程序文件的扩展名称是什么 python程序的扩展名有.py..pyc..pyo和.pyd..py是源文件,.pyc是源文件编译后的文件,.pyo是源文件优化编译后的文件,.pyd是其他语言 ...
最新文章
- win2003 shutdown命令
- 完整mes代码(含客户端和server端_200行代码实现基于paxos的kv存储
- 《数据结构与算法分析—Java语言描述》pdf
- 俄罗斯机器人雄鹿_在雄鹿无球可打,在火箭重获新生!哈登,你又让一人打出身价...
- python3字符串拼接_Python3基础 str + 字符串变量拼接
- java 自动类型_java类型自动转换
- 服务器虚拟光驱无法加载,Proxmox/创建PVE/安装windows 2012r2系统无法识别硬盘/如何添加virtio驱动/...
- 高质量程序设计指南c++/c语言(30)--引用
- 重温WCF之数据契约和序列化(四)
- Python生成1000个随机字符的字符串,统计每个字符的出现次数(choices函数和Counter的使用)
- 解决:安装R包时,经常提示“package ‘readr’ is not available (for R version 3.5.1)”的问题
- 马克思知识点总结(一)
- Zune无法连接手机的解决办法
- Live2D 博客页面添加板娘
- 数据结构与算法系列 目录(摘抄自“skywang12345”)
- 安装docker环境报错:Could not resolve host
- python爬虫GUI工具,tkinter网易云歌单歌曲下载器
- You can find the Nike LeBron Soldier 11 now at kd10sale.com
- 计算机辅助英语训练新方法,一种新的计算机辅助英语教学模式
- android 录屏方案 VFR和CFR