1。 首先先引入相关的lib包,重点需引用activemq-client-5.8.0.jar,activemq-core-5.7.0.jar,activemq-pool-5.8.0.jar,activemq-protobuf-1.1.jar等包,其他包

自行配置。

2。 一些公共工具类的代码:

JMSProducer.java

package com.ffcs.icity.jms;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;/*** JMS消息生产者* @author linwei**/
public class JMSProducer implements ExceptionListener{//设置连接的最大连接数public final static int DEFAULT_MAX_CONNECTIONS=5;private int maxConnections = DEFAULT_MAX_CONNECTIONS;//设置每个连接中使用的最大活动会话数private int maximumActiveSessionPerConnection = DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION;public final static int DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION=300;//线程池数量private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;public final static int DEFAULT_THREAD_POOL_SIZE=50;//强制使用同步返回数据的格式private boolean useAsyncSendForJMS = DEFAULT_USE_ASYNC_SEND_FOR_JMS;public final static boolean DEFAULT_USE_ASYNC_SEND_FOR_JMS=true;//是否持久化消息private boolean isPersistent = DEFAULT_IS_PERSISTENT;public final static boolean DEFAULT_IS_PERSISTENT=true; //连接地址private String brokerUrl;private String userName;private String password;private ExecutorService threadPool;private PooledConnectionFactory connectionFactory;public JMSProducer(String brokerUrl, String userName, String password) {this(brokerUrl, userName, password, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION, DEFAULT_THREAD_POOL_SIZE, DEFAULT_USE_ASYNC_SEND_FOR_JMS, DEFAULT_IS_PERSISTENT);}public JMSProducer(String brokerUrl, String userName, String password, int maxConnections, int maximumActiveSessionPerConnection, int threadPoolSize,boolean useAsyncSendForJMS, boolean isPersistent) {this.useAsyncSendForJMS = useAsyncSendForJMS;this.isPersistent = isPersistent;this.brokerUrl = brokerUrl;this.userName = userName;this.password = password;this.maxConnections = maxConnections;this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;this.threadPoolSize = threadPoolSize;init();}private void init() {//设置JAVA线程池this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize);//ActiveMQ的连接工厂ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS);//Active中的连接池工厂this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory);this.connectionFactory.setCreateConnectionOnStartup(true);this.connectionFactory.setMaxConnections(this.maxConnections);this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection);}/*** 执行发送消息的具体方法* @param queue* @param map*/public void send(final String queue, final Map<String, Object> map) {//直接使用线程池来执行具体的调用this.threadPool.execute(new Runnable(){@Overridepublic void run() {try {sendMsg(queue,map);} catch (Exception e) {e.printStackTrace();}}});}/*** 真正的执行消息发送* @param queue* @param map* @throws Exception*/private void sendMsg(String queue, Map<String, Object> map) throws Exception {Connection connection = null;Session session = null;try {//从连接池工厂中获取一个连接connection = this.connectionFactory.createConnection();/*createSession(boolean transacted,int acknowledgeMode)transacted - indicates whether the session is transacted acknowledgeMode - indicates whether the consumer or the client will acknowledge any messages it receives; ignored if the session is transacted. Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE.*///false 参数表示 为非事务型消息,后面的参数表示消息的确认类型session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//Destination is superinterface of Queue//PTP消息方式     Destination destination = session.createQueue(queue);//Creates a MessageProducer to send messages to the specified destinationMessageProducer producer = session.createProducer(destination);//set delevery modeproducer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);//map convert to javax messageMessage message = getMessage(session, map);producer.send(message);} finally {closeSession(session);closeConnection(connection);}}private Message getMessage(Session session, Map<String, Object> map) throws JMSException {MapMessage message = session.createMapMessage();if (map != null && !map.isEmpty()) {Set<String> keys = map.keySet();for (String key : keys) {message.setObject(key, map.get(key));}}return message;}private void closeSession(Session session) {try {if (session != null) {session.close();}} catch (Exception e) {e.printStackTrace();}}private void closeConnection(Connection connection) {try {if (connection != null) {connection.close();}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {e.printStackTrace();}}

JMSConsumer.java

package com.ffcs.icity.jms;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;/*** JMS消息消费者* @author linwei**/
public class JMSConsumer implements ExceptionListener {//队列预取策略private int queuePrefetch=DEFAULT_QUEUE_PREFETCH;public final static int DEFAULT_QUEUE_PREFETCH=10;private String brokerUrl;private String userName;private String password;private MessageListener messageListener;private Connection connection;private Session session;//队列名private String queue;/*** 执行消息获取的操作* @throws Exception*/public void start() throws Exception {//ActiveMQ的连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl);connection = connectionFactory.createConnection();//activeMQ预取策略ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();prefetchPolicy.setQueuePrefetch(queuePrefetch);((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy);connection.setExceptionListener(this);connection.start();//会话采用非事务级别,消息到达机制使用自动通知机制session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(this.queue);MessageConsumer consumer = session.createConsumer(destination);consumer.setMessageListener(this.messageListener);}/*** 关闭连接*/public void shutdown(){try {if (session != null) {session.close();session=null;}if (connection != null) {connection.close();connection=null;}} catch (Exception e) {e.printStackTrace();}}@Overridepublic void onException(JMSException e) {e.printStackTrace();}public String getBrokerUrl() {return brokerUrl;}public void setBrokerUrl(String brokerUrl) {this.brokerUrl = brokerUrl;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getQueue() {return queue;}public void setQueue(String queue) {this.queue = queue;}public MessageListener getMessageListener() {return messageListener;}public void setMessageListener(MessageListener messageListener) {this.messageListener = messageListener;}public int getQueuePrefetch() {return queuePrefetch;}public void setQueuePrefetch(int queuePrefetch) {this.queuePrefetch = queuePrefetch;}}

MessageHandler.java

package com.ffcs.icity.jms;import javax.jms.Message;/*** 提供消息操作的回调接口* @author linwei**/
public interface MessageHandler {/*** 消息回调提供的调用方法* @param message*/public void handle(Message message);
}

MultiThreadMessageListener.java

package com.ffcs.icity.jms;import java.util.concurrent.ExecutorService;import javax.jms.Message;
import javax.jms.MessageListener;/*** 消息消费者中使用的多线程消息监听服务* @author linwei**/
public class MultiThreadMessageListener implements MessageListener {//默认线程池数量public final static int DEFAULT_HANDLE_THREAD_POOL=10;//最大的处理线程数.private int maxHandleThreads;//提供消息回调调用接口private MessageHandler messageHandler;private ExecutorService handleThreadPool;public MultiThreadMessageListener(MessageHandler messageHandler){this(DEFAULT_HANDLE_THREAD_POOL, messageHandler);}public MultiThreadMessageListener(int maxHandleThreads,MessageHandler messageHandler){this.maxHandleThreads=maxHandleThreads;this.messageHandler=messageHandler;//支持阻塞的固定大小的线程池(自行手动创建的)this.handleThreadPool = new FixedAndBlockedThreadPoolExecutor(this.maxHandleThreads);}/*** 监听程序中自动调用的方法*/@Overridepublic void onMessage(final Message message) {//使用支持阻塞的固定大小的线程池来执行操作this.handleThreadPool.execute(new Runnable() {public void run() {try {MultiThreadMessageListener.this.messageHandler.handle(message);} catch (Exception e) {e.printStackTrace();}}});}}

FixedAndBlockedThreadPoolExecutor.java

package com.ffcs.icity.jms;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;/*** 支持阻塞的固定大小的线程池* @author linwei**/
public class FixedAndBlockedThreadPoolExecutor extends ThreadPoolExecutor {//一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。//使用 lock 块来调用 try,在之前/之后的构造中private ReentrantLock lock = new ReentrantLock();private Condition condition = this.lock.newCondition();public FixedAndBlockedThreadPoolExecutor(int size) {super(size, size, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}/*** 当线程池中没有空闲线程时,会挂起此方法的调用线程.直到线程池中有线程有空闲线程.*/@Overridepublic void execute(Runnable command) {//进行同步锁定this.lock.lock();super.execute(command);try {//如果线程池的数量已经达到最大线程池的数量,则进行挂起操作if (getPoolSize() == getMaximumPoolSize()) {this.condition.await();}} catch (InterruptedException e) {e.printStackTrace();} finally {this.lock.unlock();}}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);try {this.lock.lock();this.condition.signal();} finally {this.lock.unlock();}}}

3. 调用例子说明:

生产者调用代码,JMSProducerTest.java

package com.ffcs.icity.test;import java.util.HashMap;
import java.util.Map;import com.ffcs.icity.jms.JMSProducer;public class JMSProducerTest {public static void main(String[] args) {locationTest();System.out.println("over.");}private static void locationTest() {//**  JMSProducer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSProducer(因为其内部存在一个线程池)//支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为tcp://localhost:61613。 //tcp和nio的区别//nio://localhost:61617 以及 tcp://localhost:61616均可在 activemq.xml配置文件中进行配置JMSProducer producer = new JMSProducer("nio://localhost:61617", "system", "manager");Map<String, Object> map = new HashMap<String, Object>();map.put("id", "1");map.put("name", "sss1113333");map.put("password", "password");producer.send("test", map);}}

消费者调用代码,JMSConsumerTest.java

package com.ffcs.icity.test;import javax.jms.MapMessage;
import javax.jms.Message;import com.ffcs.icity.jms.JMSConsumer;
import com.ffcs.icity.jms.MessageHandler;
import com.ffcs.icity.jms.MultiThreadMessageListener;public class JMSConsumerTest {public static void main(String[] args) throws Exception {//**  JMSConsumer 可以设置成全局的静态变量,只需实例化一次即可使用,禁止循环重复实例化JMSConsumer(因为其内部存在一个线程池)JMSConsumer consumer = new JMSConsumer();consumer.setBrokerUrl("tcp://localhost:61616");consumer.setQueue("test");consumer.setUserName("system");consumer.setPassword("manager");consumer.setQueuePrefetch(500);consumer.setMessageListener(new MultiThreadMessageListener(50,new MessageHandler() {public void handle(Message message) {try {System.out.println("name is " + ((MapMessage)message).getString("name"));Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}}}));consumer.start();//        Thread.sleep(5000);
//      consumer.shutdown();}}

ActiveMQ使用线程池实现消息的生产与消费相关推荐

  1. ActiveMQ——消息的生产和消费

    一.ActiveMQ中消息的管理机制  使用ActiveMQ的目的必然是处理消息,大体步骤如下:   ①通过ConnectionFactory连接到ActiveMQ服务器   ②通过Connectio ...

  2. C10k破局(一)——线程池和消息队列实现高并发服务器

    一.C10k的由来 互联网的基础就是网络通信,早期的互联网可以说是一个小群体的集合.互联网还不够普及,用户也不多,一台服务器同时在线100个用户估计在当时已经算是大型应用了,所以并不存在什么 C10K ...

  3. 线程池和消息队列的一些理解

    1.两者内部都使用了队列,如阻塞队列.优先级队列: 2.使用线程池时应用服务器既充当生产者又充当消费者,也是消息队列中间件的实现者,使用消息队列时中间件.生产者.消费者可以部署在不同的应用机器上(当然 ...

  4. 一次因线程池使用不当造成生产事故的排查记录与思考

    美好的一天从bug结束 某日当我点开熟悉的界面,一个又一个请求失败的提示赫然出现在屏幕上,不会是昨晚上线的代码有问题吧? 吓得我急忙按F12查看了响应--"exception":& ...

  5. RocketMQ事务消息从生产到消费原理详解(包括回查过程)

    名词解释 half消息(生产者发送的Prepare消息):发送到MQ Server但无法被consumer消费的消息,暂时存在MQ Server,需要收到生产者二次确认后才能被消费 消息回查:一些意外 ...

  6. 不恰当使用线程池处理 MQ 消息引起的故障

    现状 业务部门反应网站访问特别慢,负责运维监控的同事说MQ消息队列积压了,中间件的说应用服务器内存占用很高,GC 一直回收不了内存,GC 线程占了近 100% 的 CPU,其他的基本上都在等待,数据库 ...

  7. JDK 伪异步编程(线程池)

    伪异步IO编程 BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新接入的客户端链路,一个线程只能处理一个客户端连接.在高性能服务器应用领域,往往需要面向成千上万个客户 ...

  8. videojs如何获取请求消息_中通消息平台 Kafka 顺序消费线程模型的实践与优化

    各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的 Key 即可,而 Kafka 会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可 ...

  9. Java多线程系列(三):Java线程池的使用方式,及核心运行原理

    之前谈过多线程相关的4种常用Java线程锁的特点,性能比较.使用场景,今天主要分享线程池相关的内容,这些都是属于Java面试的必考点. 为什么需要线程池 java中为了提高并发度,可以使用多线程共同执 ...

最新文章

  1. pytorch 多GPU训练总结(DataParallel的使用)
  2. Day19-File操作-创建 删除,文件过滤
  3. iPhone开发四剑客之《iPhone开发秘籍》
  4. android中进程的优先级
  5. Django(models中字段+参数)
  6. Redpine Signals RS9110-N-11-02 Wi-Fi解决方案
  7. Python学习,装饰器,元类
  8. CSS3动画大全(附源码)flex布局,grid布局3d旋转,图像模糊,文字发光
  9. linux 启动rsyslog服务_linux rsyslog服务部署
  10. 工程中选择数据结构和算法的依据
  11. @selector 如何调用在另一个类中的静态函数?
  12. TortoiseSVN文件夹操作
  13. 十五、Oracle学习笔记:序列(用于表字段值自增)
  14. 5G无线技术基础自学系列 | 传统无线网络架构
  15. latex3的基本介绍
  16. 原始分布式架构服务探索的得与失
  17. 正交匹配追踪算法OMP(Orthogonal Matching Pursuit)
  18. 晦涩难懂的c语言语句,【c/c 学习心得】晦涩难懂的const关键词,const v.s. 指标值...
  19. Android 第三方QQ分享功能实现
  20. bbr中的缩放因子BW_SCALE/BBR_SCALE

热门文章

  1. 从微店到小程序店,微商仍是电商途中的苦行僧?
  2. 痞子衡嵌入式:极易上手的可视化wxPython GUI构建工具(wxFormBuilder)
  3. 【SQL 提示 之二】index_ss Index Skip Hint
  4. dhcpd.conf配置的有关说明
  5. 21个演示展示强大的jQuery特效
  6. poj1019(打表预处理+数学)
  7. 解决树莓派的gpio口不能读取ds18b20的设备文件
  8. Poi实现Excel的导入
  9. iptables 垫脚石之 NAT DNAT SNAT 代理 深度理解
  10. Java 运行程序,并获得结果