Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证。 sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格有序的,即消息接收的顺序是按照消息发送的顺序来的, 而标准队列是尽最大可能有序, 即不保证一定为有序, 此外FIFO还保证了消息在一定时间内不能重复发出,即使是重复发了, 它也不会把消息发送到队列上。

队列操作

创建队列

AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

CreateQueueRequest create_request = new CreateQueueRequest(QUEUE_NAME)

.addAttributesEntry("DelaySeconds", "60")

.addAttributesEntry("MessageRetentionPeriod", "86400");

try {

sqs.createQueue(create_request);

} catch (AmazonSQSException e) {

if (!e.getErrorCode().equals("QueueAlreadyExists")) {

throw e;

}

}

列出队列

AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

ListQueuesResult lq_result = sqs.listQueues();

System.out.println("Your SQS Queue URLs:");

for (String url : lq_result.getQueueUrls()) {

System.out.println(url);

}

获取队列Url

AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();

删除队列

AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

sqs.deleteQueue(queue_url);

消息操作

发送消息

SendMessageRequest send_msg_request = new SendMessageRequest()

.withQueueUrl(queueUrl)

.withMessageBody("hello world")

.withDelaySeconds(5);

sqs.sendMessage(send_msg_request);

批量发送消息

SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()

.withQueueUrl(queueUrl)

.withEntries(

new SendMessageBatchRequestEntry(

"msg_1", "Hello from message 1"),

new SendMessageBatchRequestEntry(

"msg_2", "Hello from message 2")

.withDelaySeconds(10));

sqs.sendMessageBatch(send_batch_request);

获取消息

List messages = sqs.receiveMessage(queueUrl).getMessages();

删除消息

for (Message m : messages) {

sqs.deleteMessage(queueUrl, m.getReceiptHandle());

}

使用JMS方法

发送消息

public class TextMessageSender {

public static void main(String args[]) throws JMSException {

ExampleConfiguration config = ExampleConfiguration.parseConfig("TextMessageSender", args);

ExampleCommon.setupLogging();

// Create the connection factory based on the config

SQSConnectionFactory connectionFactory = new SQSConnectionFactory(

new ProviderConfiguration(),

AmazonSQSClientBuilder.standard()

.withRegion(config.getRegion().getName())

.withCredentials(config.getCredentialsProvider())

);

// Create the connection

SQSConnection connection = connectionFactory.createConnection();

// Create the queue if needed

ExampleCommon.ensureQueueExists(connection, config.getQueueName());

// Create the session

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer( session.createQueue( config.getQueueName() ) );

sendMessages(session, producer);

// Close the connection. This closes the session automatically

connection.close();

System.out.println( "Connection closed" );

}

private static void sendMessages( Session session, MessageProducer producer ) {

BufferedReader inputReader = new BufferedReader(

new InputStreamReader( System.in, Charset.defaultCharset() ) );

try {

String input;

while( true ) {

System.out.print( "Enter message to send (leave empty to exit): " );

input = inputReader.readLine();

if( input == null || input.equals("" ) ) break;

TextMessage message = session.createTextMessage(input);

producer.send(message);

System.out.println( "Send message " + message.getJMSMessageID() );

}

} catch (EOFException e) {

// Just return on EOF

} catch (IOException e) {

System.err.println( "Failed reading input: " + e.getMessage() );

} catch (JMSException e) {

System.err.println( "Failed sending message: " + e.getMessage() );

e.printStackTrace();

}

}

}

同步接收消息

public class SyncMessageReceiver {

public static void main(String args[]) throws JMSException {

ExampleConfiguration config = ExampleConfiguration.parseConfig("SyncMessageReceiver", args);

ExampleCommon.setupLogging();

// Create the connection factory based on the config

SQSConnectionFactory connectionFactory = new SQSConnectionFactory(

new ProviderConfiguration(),

AmazonSQSClientBuilder.standard()

.withRegion(config.getRegion().getName())

.withCredentials(config.getCredentialsProvider())

);

// Create the connection

SQSConnection connection = connectionFactory.createConnection();

// Create the queue if needed

ExampleCommon.ensureQueueExists(connection, config.getQueueName());

// Create the session

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

connection.start();

receiveMessages(session, consumer);

// Close the connection. This closes the session automatically

connection.close();

System.out.println( "Connection closed" );

}

private static void receiveMessages( Session session, MessageConsumer consumer ) {

try {

while( true ) {

System.out.println( "Waiting for messages");

// Wait 1 minute for a message

Message message = consumer.receive(TimeUnit.MINUTES.toMillis(1));

if( message == null ) {

System.out.println( "Shutting down after 1 minute of silence" );

break;

}

ExampleCommon.handleMessage(message);

message.acknowledge();

System.out.println( "Acknowledged message " + message.getJMSMessageID() );

}

} catch (JMSException e) {

System.err.println( "Error receiving from SQS: " + e.getMessage() );

e.printStackTrace();

}

}

}

异步接收消息

public class AsyncMessageReceiver {

public static void main(String args[]) throws JMSException, InterruptedException {

ExampleConfiguration config = ExampleConfiguration.parseConfig("AsyncMessageReceiver", args);

ExampleCommon.setupLogging();

// Create the connection factory based on the config

SQSConnectionFactory connectionFactory = new SQSConnectionFactory(

new ProviderConfiguration(),

AmazonSQSClientBuilder.standard()

.withRegion(config.getRegion().getName())

.withCredentials(config.getCredentialsProvider())

);

// Create the connection

SQSConnection connection = connectionFactory.createConnection();

// Create the queue if needed

ExampleCommon.ensureQueueExists(connection, config.getQueueName());

// Create the session

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

MessageConsumer consumer = session.createConsumer( session.createQueue( config.getQueueName() ) );

ReceiverCallback callback = new ReceiverCallback();

consumer.setMessageListener( callback );

// No messages are processed until this is called

connection.start();

callback.waitForOneMinuteOfSilence();

System.out.println( "Returning after one minute of silence" );

// Close the connection. This closes the session automatically

connection.close();

System.out.println( "Connection closed" );

}

private static class ReceiverCallback implements MessageListener {

// Used to listen for message silence

private volatile long timeOfLastMessage = System.nanoTime();

public void waitForOneMinuteOfSilence() throws InterruptedException {

for(;;) {

long timeSinceLastMessage = System.nanoTime() - timeOfLastMessage;

long remainingTillOneMinuteOfSilence =

TimeUnit.MINUTES.toNanos(1) - timeSinceLastMessage;

if( remainingTillOneMinuteOfSilence < 0 ) {

break;

}

TimeUnit.NANOSECONDS.sleep(remainingTillOneMinuteOfSilence);

}

}

@Override

public void onMessage(Message message) {

try {

ExampleCommon.handleMessage(message);

message.acknowledge();

System.out.println( "Acknowledged message " + message.getJMSMessageID() );

timeOfLastMessage = System.nanoTime();

} catch (JMSException e) {

System.err.println( "Error processing message: " + e.getMessage() );

e.printStackTrace();

}

}

}

}

amazon sqs java_Amazon SQS 消息队列服务相关推荐

  1. Amazon Lambda支持以简单队列服务作为事件源了

    Amazon发布更新其简单队列服务(SQS)--开发人员现在可以使用SQS触发AWS Lambda函数了.而且,开发人员不再需要运行轮询服务或创建SQS到SNS的映射. \\ Amazon SQS是一 ...

  2. Amazon SQS 消息队列服务

    Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证. sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格 ...

  3. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  4. 理论修炼之RabbitMQ,消息队列服务的稳健者

    ????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...

  5. C#中使用消息队列服务

    C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy. ...

  6. java 结合redis队列_在 Java 中使用 redis 的消息队列服务

    前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...

  7. 快速搭建基于beanstalk的php消息队列服务

    本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...

  8. 滴滴出行基于RocketMQ构建企业级消息队列服务的实践

    \n 本文整理自滴滴出行消息队列负责人 江海挺 在Apache RocketMQ开发者沙龙北京站的分享. \n \n 滴滴出行的消息技术选型 \n 历史 \n 初期,公司内部没有专门的团队维护消息队列 ...

  9. aws sqs java_Amazon SQS

    特点.功能和接口 问:是否可以将 Amazon SQS 与其他 AWS 服务结合使用? 能.您可以将 Amazon SQS 与 Amazon EC2.Amazon EC2 Container Serv ...

最新文章

  1. Sealed,new,virtual,abstract与override的区别
  2. Android Logcat的使用
  3. 为什么nodejs是单进程的_nodejs真的是单线程吗?
  4. 为什么大型科技公司更会发生人员流失 标准 ceo 软件 技术 图 阅读2479 原文:Why Good People Leave Large Tech Companies 作者:steve
  5. 关于在 Ubuntu 上安装 SteamOS session
  6. IPC--进程间通信三(共享内存)
  7. win8经典开始菜单计算机,Win8.1/win8开始菜单工具大盘点
  8. 【CloudXNS教您几招】如何让多ip域名配置游刃有余?(2)
  9. 个个都在比赚钱,有没比不赚钱的
  10. Macaca-iOS入门那些事2
  11. 苹果 macOS Monterey 桌面抽象风格不好看,如何换成自己喜欢的照片?
  12. mysql concat 能否返回数字_关于Mysql中GROUP_CONCAT函数返回值长度的坑
  13. storm 使用外部配置文件提交拓扑
  14. Matlab——指派问题
  15. .log 合并或 .txt 合并
  16. 数电和模电的区别和联系
  17. JS中经纬度的正则表达式(亲测有效)
  18. 杰出人物的四大法宝——与成功学大师对话
  19. centos7是linux内核,在CentOS 7上用源代码编译最新的Linux内核
  20. 字节跳动2020秋招笔试题

热门文章

  1. Sklearn(scikit-learn)
  2. 安装Fedora系统和一些系统配置
  3. AI加持下的安防监控智能化展现——EasyGBS/EasyCVR的AI识别
  4. 关于数据库的外模式 内模式 和 模式
  5. 高房屋空置率反映了三大问题的性质?
  6. 【华为OD机试 2023】 字符串解密(C++ Java JavaScript Python 100%)
  7. FreeMaker模板引擎
  8. Apache Jute
  9. oracle 数据库自动备份
  10. 如何才能屏蔽迅雷版本低的提示啊?