2019独角兽企业重金招聘Python工程师标准>>>

API

一、exchangeDeclare 交换器声明

    /*** Declare an exchange, via an interface that allows the complete set of* arguments.* @see com.rabbitmq.client.AMQP.Exchange.Declare* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk* @param exchange the name of the exchange* @param type the exchange type* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)* @param autoDelete true if the server should delete the exchange when it is no longer in use* @param internal true if the exchange is internal, i.e. can't be directly* published to by a client.* @param arguments other properties (construction arguments) for the exchange* @return a declaration-confirm method to indicate the exchange was successfully declared* @throws java.io.IOException if an error is encountered*/Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;

exchange : 交换器名称
type : 交换器类型 参考:https://my.oschina.net/LucasZhu/blog/1838105
durable : true if we are declaring a durable exchange (the exchange will survive a server restart) 是否耐用,如果设置为耐用的,当重启服务,交换器还将继续存在,否则会消失
autoDelete :true if the server should delete the exchange when it is no longer in use 当建立一次链接之后,如果在没有其他队列链接此交换器,则交换器会自动删除。同 队列声明中autoDelete
internal : true if the exchange is internal, i.e. can't be directly  published to by a client. 是否是内部交换器,如果是内部交换器,则不能与外部client-客户端进行直接连接,例如死信队列中设置的exchange。
arguments  如下:

二、channel.queueDeclare 声明队列

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*** Declare a queue* @see com.rabbitmq.client.AMQP.Queue.Declare* @see com.rabbitmq.client.AMQP.Queue.DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;

1. queue:声明队列名称
2. durable :true if we are declaring a durable queue (the queue will survive a server restart) 如果我们声明为持久化 , 则在重新启动的时候该队列还将存在。
3. exclusive true if we are declaring an exclusive queue (restricted to this connection) 排他队列,限于创建此队列的链接进行操作
4. autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 当建立一次链接之后,如果在没有其他消费者链接此队列,则队列会自动删除。
5.队列如下相关的属性信息:

三、basicPublish 基础发送消息:

channel.basicPublish("", QUEUE_NAME, null, (message+"1").getBytes("UTF-8"));/*** Publish a message.** Publishing to a non-existent exchange will result in a channel-level* protocol exception, which closes the channel.** Invocations of <code>Channel#basicPublish</code> will eventually block if a* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.** @see com.rabbitmq.client.AMQP.Basic.Publish* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.* @param exchange the exchange to publish the message to* @param routingKey the routing key* @param props other properties for the message - routing headers etc* @param body the message body* @throws java.io.IOException if an error is encountered*/void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;/*** Publish a message.** Publishing to a non-existent exchange will result in a channel-level* protocol exception, which closes the channel.** Invocations of <code>Channel#basicPublish</code> will eventually block if a* <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.** @see com.rabbitmq.client.AMQP.Basic.Publish* @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.* @param exchange the exchange to publish the message to* @param routingKey the routing key* @param mandatory true if the 'mandatory' flag is to be set* @param immediate true if the 'immediate' flag is to be* set. Note that the RabbitMQ server does not support this flag.* @param props other properties for the message - routing headers etc* @param body the message body* @throws java.io.IOException if an error is encountered*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;

exchange:交换器名称,如果为空字符串"",则直接发送消息到队列,不经过交换器。
routingKey : 为经过交换器中消息的路由规则,如果没有交换器(""),则它为队列的名称。#匹配0个或多个单词,*匹配一个单词  在topic exchange做消息转发用
props : 设置消息的属性信息,如消息头,消息的超时时间等等。
body : 消息体
mandatory : true 如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者。false : 出现上述情形broker会直接将消息扔掉。
immediate : true 如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。

简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

四、queueBind 队列和交换器的绑定

channel.queueBind(queueName, EXCHANGE_NAME, severity);/*** Bind a queue to an exchange, with no extra arguments.* @see com.rabbitmq.client.AMQP.Queue.Bind* @see com.rabbitmq.client.AMQP.Queue.BindOk* @param queue the name of the queue* @param exchange the name of the exchange* @param routingKey the routine key to use for the binding* @return a binding-confirm method if the binding was successfully created* @throws java.io.IOException if an error is encountered*/Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

queue : 队列名称。
exchange : 交换器名称。
routingKey  : 这里的routingKey也叫bindingKey 是队列和交换器的直接绑定规则。

五、channel.basicConsume

推模式获取消息(消息中间件主动将消息推送给消费者)

推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。好处很明显,消费者总是有一堆在内存中待处理的消息,所以效率高。缺点是缓冲区可能会溢出

Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}
};
channel.basicConsume(queueName, true, consumer);/*** Start a non-nolocal, non-exclusive consumer, with* a server-generated consumerTag.* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @param callback an interface to the consumer object* @return the consumerTag generated by the server* @throws java.io.IOException if an error is encountered* @see com.rabbitmq.client.AMQP.Basic.Consume* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

round-robin(推-轮询)

推的方式采用的轮询(round-robin)的方式,这里轮询的意思不是消息的轮询,而是推送次数的轮询,推送可能一次推送一条消息也可能一次推送多条消息。实现方式如上。

chanel.basicQos()

/*** Request specific "quality of service" settings.** These settings impose limits on the amount of data the server* will deliver to consumers before requiring acknowledgements.* Thus they provide a means of consumer-initiated flow control.* @see com.rabbitmq.client.AMQP.Basic.Qos* @param prefetchSize maximum amount of content (measured in* octets) that the server will deliver, 0 if unlimited* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;/*** Request a specific prefetchCount "quality of service" settings* for this channel.** @see #basicQos(int, int, boolean)* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @param global true if the settings should be applied to the* entire channel rather than each consumer* @throws java.io.IOException if an error is encountered*/
void basicQos(int prefetchCount, boolean global) throws IOException;/*** Request a specific prefetchCount "quality of service" settings* for this channel.** @see #basicQos(int, int, boolean)* @param prefetchCount maximum number of messages that the server* will deliver, 0 if unlimited* @throws java.io.IOException if an error is encountered*/
void basicQos(int prefetchCount) throws IOException;prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的(我还没试过一个信道支持多个消费者的例子)
可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制

fair dispatch(推-公平分发)

上面轮询推策略方式在服务器性能不均的情况下,带来消息在某个或某些消费者节点大量堆积的问题。
怎样才能做到按照每个消费者的能力分配消息呢,联合使用Qos和Acknowledge就可以做到。

channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);try {TimeUnit.MILLISECONDS.sleep(2000);} catch (InterruptedException e) {}channel.basicAck(envelope.getDeliveryTag(), false);}
};
channel.basicConsume(QUEUE_NAME, false, consumer);

核心:1.设置通道最多堆积(拥有)一条未提交的消息,当消费完成并且ack之后 channel再进行第二条消息的接收。2.设置ack方式为手动通知。

basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1,消息从队列异步推送给消费者,消费者的ack也是异步发送给队列,从队列的角度去看,总是会有一批消息已推送但尚未获得ack确认。Qos(Quality of Service)的prefetchCount参数就是来限制这批未确认消息的数量。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

六、channel.basicGet

拉模式获取消息(消费者主动从消息中间件拉取消息)

拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。

while(true) {GetResponse response = channel.basicGet(queueName, true);if(response == null) {System.out.println("Get Nothing!");TimeUnit.MILLISECONDS.sleep(1000);} else {String message = new String(response.getBody(), "UTF-8");System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "Receiver", queueName, message);TimeUnit.MILLISECONDS.sleep(500);channel.basicAck(response.getEnvelope().getDeliveryTag(),false);}
}/*** Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}* @see com.rabbitmq.client.AMQP.Basic.Get* @see com.rabbitmq.client.AMQP.Basic.GetOk* @see com.rabbitmq.client.AMQP.Basic.GetEmpty* @param queue the name of the queue* @param autoAck true if the server should consider messages* acknowledged once delivered; false if the server should expect* explicit acknowledgements* @return a {@link GetResponse} containing the retrieved message data* @throws java.io.IOException if an error is encountered*/
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;
非阻塞方法,如果队列中没有数据会返回null

七、channel.basicAck();

/*** Acknowledge one or several received* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method* containing the received message being acknowledged.* @see com.rabbitmq.client.AMQP.Basic.Ack* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param multiple true to acknowledge all messages up to and* including the supplied delivery tag; false to acknowledge just* the supplied delivery tag.* @throws java.io.IOException if an error is encountered*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag : 该消息的index 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。RabbitMQ保证在每个信道中,每条消息的DeliveryTag 从1开始递增。
multiple : 是否批量 true : 将一次性ack所有小于deliveryTag的消息。false 确认当前消息。

八、channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

/*** Reject one or several received messages.** Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.* @see com.rabbitmq.client.AMQP.Basic.Nack* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param multiple true to reject all messages up to and including* the supplied delivery tag; false to reject just the supplied* delivery tag.* @param requeue true if the rejected message(s) should be requeued rather* than discarded/dead-lettered* @throws java.io.IOException if an error is encountered*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag : 该消息的index
multiple : 是否批量. true 将一次性拒绝所有小于deliveryTag的消息。
requeue : 被拒绝的是否重新入队。

九、channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

/*** Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method* containing the received message being rejected.* @see com.rabbitmq.client.AMQP.Basic.Reject* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered* @throws java.io.IOException if an error is encountered*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag : 该消息的index 。
requeue : 被拒绝的是否重新入队。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息

转载于:https://my.oschina.net/LucasZhu/blog/1838514

RabbitMQ Java 基本API相关推荐

  1. (RabbitMQ) Java Client API Guide

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 本篇翻译的是RabbitMQ官方文档关 ...

  2. mq 接口 java_Rabbitmq Java Client Api详解

    AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现. 基础概念快速入门 每个rabbitmq-server叫做一个Broker,等着tcp连接进入. 在rabbi ...

  3. 译:1. RabbitMQ Java Client 之 Hello World

    这些教程介绍了使用RabbitMQ创建消息传递应用程序的基础知识.您需要安装RabbitMQ服务器才能完成教程 1. 打造第一个Hello World 程序 RabbitMQ是一个消息代理:它接受和转 ...

  4. RabbitMQ Management HTTP API的简单封装

    RabbitMQ Management HTTP API的简单封装 文章目录 RabbitMQ Management HTTP API的简单封装 官方相关 代码展示 官方相关 官方文档:https:/ ...

  5. 我也没想到,Java开发 API接口可以不用写 Controller了

    大家好,我是小富~ 今天介绍我正在用的一款高效敏捷开发工具magic-api,顺便分享一点工作中使用它的心得 缘起 先说一下我为什么会使用这个工具? 最近新启动一个项目,业务并不算复杂,那种典型的管理 ...

  6. Java的API及Object类、String类、字符串缓存区

    Java 的API(API: Application(应用) Programming(程序) Interface(接口)) Object:Object类是Java语言中的根类,即所有类的父类. equ ...

  7. Java Persistence API中的FetchType LAZY和EAGER之间的区别?

    我是Java Persistence API和Hibernate的新手. Java Persistence API中的FetchType.LAZY和FetchType.EAGER什么区别? #1楼 我 ...

  8. Java 常用API的运用,效率及技巧

    1.     Java面向对象基本概念 2.     System 3.     String, StringBuffer 4.     数值,字符,布尔对象与简单类型的操作 5.     Class ...

  9. 关于 Java Collections API 您不知道的 5 件事--转

    第 1 部分 http://www.ibm.com/developerworks/cn/java/j-5things2.html 对于很多 Java 开发人员来说,Java Collections A ...

最新文章

  1. fedora12安装小企鹅输入法
  2. tp点一共有多少_致命女人更新时间 致命女人第一季一共多少集在线观看地址
  3. 关于java环境配置问题
  4. list可以存放python中任意类型的数据_Python中常见的数据类型小结
  5. Vue 3 都 RC 了,前端的你还不来看看
  6. Python 第三方模块之 MySQL数据库连接模块 PyMySQL
  7. 14 MM配置-BP业务伙伴-定义供应商科目组和字段选择
  8. 【Recorder.js+百度语音识别】全栈方案技术细节
  9. 在WPF TreeView中使用复选框
  10. Comsenz 核心产品 Discuz! X3.3 正式版【2017-07-01】 -论坛搭建
  11. 深度学习TensorFlow生产环境部署(环境准备篇)
  12. 搜索引擎网页排序算法
  13. 阿里云推出香港高防IP服务 为中国企业出海安全护航
  14. 计算机共享网络热点,手把手教你在win7电脑中设置共享wifi热点
  15. java微信公众号上传永久素材_微信开放平台永久素材视频文件上传
  16. 【微信红包封面】哆啦A梦 x GUCCI古驰限定版!!
  17. linux crontab不执行
  18. GraphSage-TF代码解读
  19. 【UmiJS 3.x入门】
  20. 移动端SEO之用户体验优化提升方法

热门文章

  1. JAVA基础 (二)反射 深入解析反射机制
  2. GdiPlus[6]: 五种画刷总览
  3. 《基于MFC的OpenGL编程》Part 1 A Primer
  4. Linux Named 进程启动、停止脚本
  5. linux ”我的草稿“
  6. 用命令行查看局域网的其他在线的ip
  7. FPGA 开平方方法
  8. 将信息系学生的计算机文化学,计算机学生论文,关于基于职业岗位的计算机文化基础课教学相关参考文献资料-免费论文范文...
  9. html导出excel时换行符,ASP.NET 导出到Excel时保留换行的代码
  10. java笔试题_【干货分享】中兴通讯2021校招笔试测评攻略