Amazon SQS 消息队列服务
Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。
队列操作
创建队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME).addAttributesEntry("DelaySeconds", "60").addAttributesEntry("MessageRetentionPeriod", "86400");try {sqs.createQueue(create_request);
} catch (AmazonSQSException e) {if (!e.getErrorCode().equals("QueueAlreadyExists")) {throw e;}
}
列出队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
ListQueuesResult lq_result = sqs.listQueues();
System.out.println("Your SQS Queue URLs:");
for (String url : lq_result.getQueueUrls()) {System.out.println(url);
}
获取队列Url
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
删除队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
sqs.deleteQueue(queue_url);
消息操作
发送消息
SendMessageRequest send_msg_request = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody("hello world").withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
批量发送消息
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest().withQueueUrl(queueUrl).withEntries(new SendMessageBatchRequestEntry("msg_1", "Hello from message 1"),new SendMessageBatchRequestEntry("msg_2", "Hello from message 2").withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
获取消息
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
删除消息
for (Message m : messages) {sqs.deleteMessage(queueUrl, m.getReceiptHandle());
}
使用JMS方法
发送消息
public class TextMessageSender {
public static void main(String args[]) throws JMSException {ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);ExampleCommon.setupLogging();// Create the connection factory based on the config SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connectionSQSConnection connection = connectionFactory.createConnection();// Create the queue if neededExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the sessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );sendMessages(session, producer);// Close the connection. This closes the session automaticallyconnection.close();System.out.println( "Connection closed" );
}private static void sendMessages( Session session, MessageProducer producer ) {BufferedReader inputReader = new BufferedReader(new InputStreamReader( System.in, Charset.defaultCharset() ) );try {String input;while( true ) { System.out.print( "Enter message to send (leave empty to exit): " );input = inputReader.readLine();if( input == null || input.equals("" ) ) break;TextMessage message = session.createTextMessage(input);producer.send(message);System.out.println( "Send message " + message.getJMSMessageID() );}} catch (EOFException e) {// Just return on EOF} catch (IOException e) {System.err.println( "Failed reading input: " + e.getMessage() );} catch (JMSException e) {System.err.println( "Failed sending message: " + e.getMessage() );e.printStackTrace();}
}
}
同步接收消息
public class SyncMessageReceiver {
public static void main(String args[]) throws JMSException {
ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);ExampleCommon.setupLogging();// Create the connection factory based on the config
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connection
SQSConnection connection = connectionFactory.createConnection();// Create the queue if needed
ExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );connection.start();receiveMessages(session, consumer);// Close the connection. This closes the session automatically
connection.close();
System.out.println( "Connection closed" );
}private static void receiveMessages( Session session, MessageConsumer consumer ) {
try {while( true ) {System.out.println( "Waiting for messages");// Wait 1 minute for a messageMessage message = consumer.receive(TimeUnit.MINUTES.toMillis(1));if( message == null ) {System.out.println( "Shutting down after 1 minute of silence" );break;}ExampleCommon.handleMessage(message);message.acknowledge();System.out.println( "Acknowledged message " + message.getJMSMessageID() );}
} catch (JMSException e) {System.err.println( "Error receiving from SQS: " + e.getMessage() );e.printStackTrace();
}
}
}
异步接收消息
public class AsyncMessageReceiver {
public static void main(String args[]) throws JMSException, InterruptedException {ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);ExampleCommon.setupLogging(); // Create the connection factory based on the configSQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connectionSQSConnection connection = connectionFactory.createConnection();// Create the queue if neededExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the sessionSession session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );ReceiverCallback callback = new ReceiverCallback();consumer.setMessageListener( callback );// No messages are processed until this is calledconnection.start();callback.waitForOneMinuteOfSilence();System.out.println( "Returning after one minute of silence" );// Close the connection. This closes the session automaticallyconnection.close();System.out.println( "Connection closed" );
}private static class ReceiverCallback implements MessageListener {// Used to listen for message silenceprivate volatile long timeOfLastMessage = System.nanoTime();public void waitForOneMinuteOfSilence() throws InterruptedException {for(;;) {long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;long remainingTillOneMinuteOfSilence = TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;if( remainingTillOneMinuteOfSilence < 0 ) {break;}TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);}}@Overridepublic void onMessage(Message message) {try {ExampleCommon.handleMessage(message);message.acknowledge();System.out.println( "Acknowledged message " + message.getJMSMessageID() );timeOfLastMessage = System.nanoTime();} catch (JMSException e) {System.err.println( "Error processing message: " + e.getMessage() );e.printStackTrace();}}
}
}
https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-messages.html
https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html
转载于:https://www.cnblogs.com/helloz/p/9314915.html
Amazon SQS 消息队列服务相关推荐
- aws sqs java_使用 Amazon SQS 消息队列 - 适用于 Java 的 AWS 开发工具包
本文属于机器翻译版本.若本译文内容与英语原文存在差异,则一律以英文原文为准. 使用 Amazon SQS 消息队列 消息队列 是用于在 Amazon SQS 中可靠地发送消息的逻辑容器.有两种类型的队 ...
- amazon sqs java_Amazon SQS 消息队列服务
Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证. sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格 ...
- Amazon SQS 消息相关接口测试用例
Amazon SQS 消息相关接口测试用例 因项目功能需求,对接了Amazon的SQS消息队列,封装了常用的接口,本文是对部分接口测试用例的简要说明. 1.发送消息 1.1.发送单条消息到标准队列 @ ...
- aws sqs java_发送、接收和删除 Amazon SQS 消息 - 适用于 Java 的 AWS 开发工具包
本文属于机器翻译版本.若本译文内容与英语原文存在差异,则一律以英文原文为准. 发送.接收和删除 Amazon SQS 消息 本主题描述了如何发送.接收和删除 Amazon SQS 消息.始终使用 SQ ...
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
(四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...
- 理论修炼之RabbitMQ,消息队列服务的稳健者
????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...
- C#中使用消息队列服务
C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy. ...
- java 结合redis队列_在 Java 中使用 redis 的消息队列服务
前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...
- 快速搭建基于beanstalk的php消息队列服务
本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...
最新文章
- 计算机CPU哪家好,2019年电脑cpu排行榜_电脑CPU哪个好 电脑CPU排行榜2019
- Android初学第34天
- chrome自动提交文件_实用!8个 chrome插件玩转GitHub,单个文件下载小意思
- 启动Tomcat服务时,出现org.apache.catalina.startup.VersionLoggerListener报错
- windows安装ffmpeg_免费实用的录屏工具!支持全屏、特定窗口、选定区域录制,支持添加水印、嵌入摄像头(附ffmpeg安装)...
- 北风设计模式课程---代理模式
- Android ListView之setEmptyView的问题
- 小程序样式写了没有用,或许你就差一行代码
- Windows照片图片便捷查看分类软件——照片分类猫
- C++异常处理throw
- 女人喝酸奶要注意什么
- idea maven 仓库 jar 包下载不来下解决方案
- Android事件分发之ViewGroup篇 -- ViewGroup的dispatchTouchEvent、onTouchEvent、onInterceptTouchEvent之间关系
- Android LeakCanary使用详细教程
- jQuery unload事件
- 一元域名真假?一元域名注册有哪些风险?
- Android的警示对话框AlertDialog简单使用实例(附Demo)
- PC实现路由器的基本功能
- 单尺度Retinex(SSR) + 代码实现
- Applovin/App Store/Google Play/Unity/Ironsource/Steam如何收款?
热门文章
- 前排!零基础小白学习3D建模的必经之路
- java中decrement,Java Math decrementExact()用法及代码示例
- php定时执行代码漏洞_在CTF比赛中发现的PHP远程代码执行0day漏洞
- ResNet、Faster RCNN、Mask RCNN 是专利算法吗?盘点何恺明参与发明的专利!
- qt 复制字符串_QT中字符串的转化与拼接
- 好的领导应该是什么脾气
- 华为鸿蒙系统学习笔记4-方舟编译器源码下载及安装
- 如何入门CTF夺旗赛
- unity 查找所以物体_用Unity来实现一下绳子效果——Obi Rope插件介绍
- JS-面向对象--创建具有私有属性的对象(2个方法)