Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

队列操作

创建队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME).addAttributesEntry("DelaySeconds", "60").addAttributesEntry("MessageRetentionPeriod", "86400");try {sqs.createQueue(create_request);
} catch (AmazonSQSException e) {if (!e.getErrorCode().equals("QueueAlreadyExists")) {throw e;}
}
列出队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
ListQueuesResult lq_result = sqs.listQueues();
System.out.println("Your SQS Queue URLs:");
for (String url : lq_result.getQueueUrls()) {System.out.println(url);
}
获取队列Url
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
删除队列
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
sqs.deleteQueue(queue_url);

消息操作

发送消息
SendMessageRequest send_msg_request = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody("hello world").withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
批量发送消息
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest().withQueueUrl(queueUrl).withEntries(new SendMessageBatchRequestEntry("msg_1", "Hello from message 1"),new SendMessageBatchRequestEntry("msg_2", "Hello from message 2").withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
获取消息
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
删除消息
for (Message m : messages) {sqs.deleteMessage(queueUrl, m.getReceiptHandle());
}

使用JMS方法

发送消息
public class TextMessageSender {
public static void main(String args[]) throws JMSException {ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);ExampleCommon.setupLogging();// Create the connection factory based on the config       SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connectionSQSConnection connection = connectionFactory.createConnection();// Create the queue if neededExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the sessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );sendMessages(session, producer);// Close the connection. This closes the session automaticallyconnection.close();System.out.println( "Connection closed" );
}private static void sendMessages( Session session, MessageProducer producer ) {BufferedReader inputReader = new BufferedReader(new InputStreamReader( System.in, Charset.defaultCharset() ) );try {String input;while( true ) { System.out.print( "Enter message to send (leave empty to exit): " );input = inputReader.readLine();if( input == null || input.equals("" ) ) break;TextMessage message = session.createTextMessage(input);producer.send(message);System.out.println( "Send message " + message.getJMSMessageID() );}} catch (EOFException e) {// Just return on EOF} catch (IOException e) {System.err.println( "Failed reading input: " + e.getMessage() );} catch (JMSException e) {System.err.println( "Failed sending message: " + e.getMessage() );e.printStackTrace();}
}
}
同步接收消息
public class SyncMessageReceiver {
public static void main(String args[]) throws JMSException {
ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);ExampleCommon.setupLogging();// Create the connection factory based on the config
SQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connection
SQSConnection connection = connectionFactory.createConnection();// Create the queue if needed
ExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );connection.start();receiveMessages(session, consumer);// Close the connection. This closes the session automatically
connection.close();
System.out.println( "Connection closed" );
}private static void receiveMessages( Session session, MessageConsumer consumer ) {
try {while( true ) {System.out.println( "Waiting for messages");// Wait 1 minute for a messageMessage message = consumer.receive(TimeUnit.MINUTES.toMillis(1));if( message == null ) {System.out.println( "Shutting down after 1 minute of silence" );break;}ExampleCommon.handleMessage(message);message.acknowledge();System.out.println( "Acknowledged message " + message.getJMSMessageID() );}
} catch (JMSException e) {System.err.println( "Error receiving from SQS: " + e.getMessage() );e.printStackTrace();
}
}
}
异步接收消息
public class AsyncMessageReceiver {
public static void main(String args[]) throws JMSException, InterruptedException {ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);ExampleCommon.setupLogging();          // Create the connection factory based on the configSQSConnectionFactory connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(),AmazonSQSClientBuilder.standard().withRegion(config.getRegion().getName()).withCredentials(config.getCredentialsProvider()));// Create the connectionSQSConnection connection = connectionFactory.createConnection();// Create the queue if neededExampleCommon.ensureQueueExists(connection, config.getQueueName());// Create the sessionSession session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );ReceiverCallback callback = new ReceiverCallback();consumer.setMessageListener( callback );// No messages are processed until this is calledconnection.start();callback.waitForOneMinuteOfSilence();System.out.println( "Returning after one minute of silence" );// Close the connection. This closes the session automaticallyconnection.close();System.out.println( "Connection closed" );
}private static class ReceiverCallback implements MessageListener {// Used to listen for message silenceprivate volatile long timeOfLastMessage = System.nanoTime();public void waitForOneMinuteOfSilence() throws InterruptedException {for(;;) {long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;long remainingTillOneMinuteOfSilence = TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;if( remainingTillOneMinuteOfSilence < 0 ) {break;}TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);}}@Overridepublic void onMessage(Message message) {try {ExampleCommon.handleMessage(message);message.acknowledge();System.out.println( "Acknowledged message " + message.getJMSMessageID() );timeOfLastMessage = System.nanoTime();} catch (JMSException e) {System.err.println( "Error processing message: " + e.getMessage() );e.printStackTrace();}}
}
}

https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-messages.html
https://docs.amazonaws.cn/en_us/AWSSimpleQueueService/latest/SQSDeveloperGuide/code-examples.html

转载于:https://www.cnblogs.com/helloz/p/9314915.html

Amazon SQS 消息队列服务相关推荐

  1. aws sqs java_使用 Amazon SQS 消息队列 - 适用于 Java 的 AWS 开发工具包

    本文属于机器翻译版本.若本译文内容与英语原文存在差异,则一律以英文原文为准. 使用 Amazon SQS 消息队列 消息队列 是用于在 Amazon SQS 中可靠地发送消息的逻辑容器.有两种类型的队 ...

  2. amazon sqs java_Amazon SQS 消息队列服务

    Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证. sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格 ...

  3. Amazon SQS 消息相关接口测试用例

    Amazon SQS 消息相关接口测试用例 因项目功能需求,对接了Amazon的SQS消息队列,封装了常用的接口,本文是对部分接口测试用例的简要说明. 1.发送消息 1.1.发送单条消息到标准队列 @ ...

  4. aws sqs java_发送、接收和删除 Amazon SQS 消息 - 适用于 Java 的 AWS 开发工具包

    本文属于机器翻译版本.若本译文内容与英语原文存在差异,则一律以英文原文为准. 发送.接收和删除 Amazon SQS 消息 本主题描述了如何发送.接收和删除 Amazon SQS 消息.始终使用 SQ ...

  5. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  6. 理论修炼之RabbitMQ,消息队列服务的稳健者

    ????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...

  7. C#中使用消息队列服务

    C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy. ...

  8. java 结合redis队列_在 Java 中使用 redis 的消息队列服务

    前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...

  9. 快速搭建基于beanstalk的php消息队列服务

    本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...

最新文章

  1. 计算机CPU哪家好,2019年电脑cpu排行榜_电脑CPU哪个好 电脑CPU排行榜2019
  2. Android初学第34天
  3. chrome自动提交文件_实用!8个 chrome插件玩转GitHub,单个文件下载小意思
  4. 启动Tomcat服务时,出现org.apache.catalina.startup.VersionLoggerListener报错
  5. windows安装ffmpeg_免费实用的录屏工具!支持全屏、特定窗口、选定区域录制,支持添加水印、嵌入摄像头(附ffmpeg安装)...
  6. 北风设计模式课程---代理模式
  7. Android ListView之setEmptyView的问题
  8. 小程序样式写了没有用,或许你就差一行代码
  9. Windows照片图片便捷查看分类软件——照片分类猫
  10. C++异常处理throw
  11. 女人喝酸奶要注意什么
  12. idea maven 仓库 jar 包下载不来下解决方案
  13. Android事件分发之ViewGroup篇 -- ViewGroup的dispatchTouchEvent、onTouchEvent、onInterceptTouchEvent之间关系
  14. Android LeakCanary使用详细教程
  15. jQuery unload事件
  16. 一元域名真假?一元域名注册有哪些风险?
  17. Android的警示对话框AlertDialog简单使用实例(附Demo)
  18. PC实现路由器的基本功能
  19. 单尺度Retinex(SSR) + 代码实现
  20. Applovin/App Store/Google Play/Unity/Ironsource/Steam如何收款?

热门文章

  1. 前排!零基础小白学习3D建模的必经之路
  2. java中decrement,Java Math decrementExact()用法及代码示例
  3. php定时执行代码漏洞_在CTF比赛中发现的PHP远程代码执行0day漏洞
  4. ResNet、Faster RCNN、Mask RCNN 是专利算法吗?盘点何恺明参与发明的专利!
  5. qt 复制字符串_QT中字符串的转化与拼接
  6. 好的领导应该是什么脾气
  7. 华为鸿蒙系统学习笔记4-方舟编译器源码下载及安装
  8. 如何入门CTF夺旗赛
  9. unity 查找所以物体_用Unity来实现一下绳子效果——Obi Rope插件介绍
  10. JS-面向对象--创建具有私有属性的对象(2个方法)