scala akka

在本文中,我们将研究如何从Scala连接到RabbitMQ,以便可以从应用程序中支持AMQP协议。 在此示例中,我将使用Play Framework 2.0作为容器(有关更多信息,请参阅我在该主题上的其他文章 )在其中运行应用程序,因为Play使得使用Scala进行开发变得容易得多。 本文还将使用Akka actor发送和接收RabbitMQ的消息。

什么是AMQP

首先,快速介绍AMQP。 AMQP代表“高级消息队列协议”,并且是消息传递的开放标准。 AMQP 主页将其愿景陈述为:“成为所有消息中间件之间互操作性的标准协议”。 AMQP定义了用于交换消息的传输级别协议,该协议可用于集成来自许多不同平台,语言和技术的应用程序。
有许多工具可以实现此协议,但是RabbitMQ越来越引起人们的关注。 RabbitMQ是使用AMQP的基于Erlang的开源消息代理。 会说AMQP的所有应用程序都可以连接并使用RabbitMQ。 因此,在本文中,我们将展示如何将基于Play2 / Scala / Akka的应用程序连接到RabbitMQ。
在本文中,我们将向您展示如何实现两种最常见的方案:

  • 发送/接收:我们将配置一个发件人每隔几秒钟发送一条消息,并使用两个侦听器以循环方式从队列中读取消息。
  • 发布/订阅:对于本示例,我们将创建几乎相同的场景,但是这次,侦听器将同时获得消息。

我假设您已经安装了RabbitMQ。 如果不是,请按照其网站上的说明进行操作。

设置基本的Play 2 / Scala项目

在此示例中,我创建了一个新的Play 2项目。 这样做很容易:

jos@Joss-MacBook-Pro.local:~/Dev/play-2.0-RC2$ ./play new Play2AndRabbitMQ_            _ _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/ play! 2.0-RC2, http://www.playframework.orgThe new application will be created in /Users/jos/Dev/play-2.0/PlayAndRabbitMQWhat is the application name?
> PlayAndRabbitMQWhich template do you want to use for this new application? 1 - Create a simple Scala application2 - Create a simple Java application3 - Create an empty project> 1OK, application PlayAndRabbitMQ is created.Have fun!

我曾经使用scala-ide插件在Eclipse上工作,所以我执行play eclipsify并将项目导入Eclipse。
我们需要做的下一步是建立正确的依赖关系。 Play为此使用sbt,并允许您从项目目录中的build.scala文件配置依赖项。 我们将添加的唯一依赖关系是RabbitMQ的Java客户端库。 即使Lift提供了一个基于Scala的AMQP库,但我发现直接使用RabbitMQ也是一样容易。 添加依赖项后,我的build.scala如下所示:

import sbt._
import Keys._
import PlayProject._object ApplicationBuild extends Build {val appName         = "PlayAndRabbitMQ"val appVersion      = "1.0-SNAPSHOT"val appDependencies = Seq("com.rabbitmq" % "amqp-client" % "2.8.1")val main = PlayProject(appName, appVersion, appDependencies, mainLang = SCALA).settings()
}

将RabbitMQ配置添加到配置文件

对于我们的示例,我们可以配置一些东西。 将消息发送到的队列,要使用的交换以及运行RabbitMQ的主机。 在实际情况下,我们将需要设置更多的配置选项,但是在这种情况下,我们只有这三个。 将以下内容添加到您的application.conf中,以便我们可以从我们的应用程序中引用它。

#rabbit-mq configuration
rabbitmq.host=localhost
rabbitmq.queue=queue1
rabbitmq.exchange=exchange1

现在,我们可以使用ConfigFactory访问这些配置文件。 为了便于访问,请创建以下对象:

object Config {val RABBITMQ_HOST = ConfigFactory.load().getString("rabbitmq.host");val RABBITMQ_QUEUE = ConfigFactory.load().getString("rabbitmq.queue");val RABBITMQ_EXCHANGEE = ConfigFactory.load().getString("rabbitmq.exchange");
}

初始化与RabbitMQ的连接

在查看如何使用RabbitMQ发送和接收消息之前,我们还有一个要定义的对象。 要使用RabbitMQ,我们需要一个连接。 我们可以使用ConnectionFactory获得与服务器的连接。 查看javadocs以获取有关如何配置连接的更多信息。

object RabbitMQConnection {private val connection: Connection = null;/*** Return a connection if one doesn't exist. Else create* a new one*/def getConnection(): Connection = {connection match {case null => {val factory = new ConnectionFactory();factory.setHost(Config.RABBITMQ_HOST);factory.newConnection();}case _ => connection}}
}

应用程序启动时启动监听器

在查看RabbitMQ代码之前,我们还需要做一件事。 我们需要确保在应用程序启动时注册了消息侦听器,并且发件人开始发送。 播放2提供了
为此的GlobalSettings对象,您可以在应用程序启动时扩展该对象以执行代码。 对于我们的示例,我们将使用以下对象(请记住,该对象需要存储在默认名称空间中:

import play.api.mvc._
import play.api._
import rabbitmq.Senderobject Global extends GlobalSettings {override def onStart(app: Application) {Sender.startSending}
}

我们将在下面的部分中查看此Sender.startSending操作,该操作将初始化所有发送者和接收者。

设置发送和接收方案

让我们看一下Sender.startSending代码,该代码将设置一个将msg发送到特定队列的发送方。 为此,我们使用以下代码:

object Sender {def startSending = {// create the connectionval connection = RabbitMQConnection.getConnection();// create the channel we use to sendval sendingChannel = connection.createChannel();// make sure the queue exists we want to send tosendingChannel.queueDeclare(Config.RABBITMQ_QUEUE, false, false, false, null);Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new SendingActor(channel = sendingChannel, queue = Config.RABBITMQ_QUEUE))), "MSG to Queue");}
}class SendingActor(channel: Channel, queue: String) extends Actor {def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish("", queue, null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

在此代码中,我们采取以下步骤:

  1. 使用工厂检索到RabbitMQ的连接
  2. 在此连接上创建一个通道,用于与RabbitMQ通信
  3. 使用通道创建队列(如果尚不存在)
  4. 安排Akka每秒向演员发送一条消息。

所有这些都应该非常简单。 唯一(有点)复杂的部分是调度部分。 此调度操作的作用是这样的。 我们告诉Akka安排要发送给演员的消息。 我们需要2秒钟的延迟才能被触发,并且我们想每秒重复一次这项工作。 应该用于此的actor是SendingActor,您也可以在此清单中看到。 该参与者需要访问通道以发送消息,并且该参与者还需要知道将接收到的消息发送到哪里。 这是队列。
因此,此Actor每秒将收到一条消息,附加一个时间戳,并使用提供的通道将此消息发送到队列:channel.basicPublish(“”,queue,null,msg.getBytes());。 现在我们每秒发送一条消息,在此队列上有可以接收消息的侦听器将是很好的。 为了接收消息,我们还创建了一个Actor,可以在特定队列上无限期地进行监听。

class ListeningActor(channel: Channel, queue: String, f: (String) => Any) extends Actor {// called on the initial rundef receive = {case _ => startReceving}def startReceving = {val consumer = new QueueingConsumer(channel);channel.basicConsume(queue, true, consumer);while (true) {// wait for the messageval delivery = consumer.nextDelivery();val msg = new String(delivery.getBody());// send the message to the provided callback function// and execute this in a subactorcontext.actorOf(Props(new Actor {def receive = {case some: String => f(some);}})) ! msg}}
}

这个actor比我们以前发送的actor要复杂一些。 当该参与者收到消息(消息的种类无关紧要)时,它将开始侦听创建该消息的队列。 它通过使用提供的通道创建使用者来实现此目的,并告诉使用者开始在指定队列上侦听。 Consumer.nextDelivery()方法将阻塞,直到在配置的队列中等待消息为止。 收到消息后,将创建一个新的Actor,将消息发送到该Actor。 这个新角色将消息传递到提供的方法,您可以在其中放置业务逻辑。
要使用此侦听器,我们需要提供以下参数:

  • 频道:允许访问RabbitMQ
  • 队列:监听消息的队列
  • f:收到消息后将执行的功能。

第一个示例的最后一步是将所有内容粘合在一起。 为此,我们向Sender.startSending方法添加了几个方法调用。

def startSending = {...val callback1 = (x: String) => Logger.info("Recieved on queue callback 1: " + x);setupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback1);// create an actor that starts listening on the specified queue and passes the// received message to the provided callbackval callback2 = (x: String) => Logger.info("Recieved on queue callback 2: " + x);// setup the listener that sends to a specific queue using the SendingActorsetupListener(connection.createChannel(),Config.RABBITMQ_QUEUE, callback2);...}private def setupListener(receivingChannel: Channel, queue: String, f: (String) => Any) {Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(receivingChannel, queue, f))), "");}

在此代码中,您可以看到我们定义了一个回调函数,并使用此回调函数以及队列和通道来创建ListeningActor。 我们使用scheduleOnce方法在单独的线程中启动此侦听器。 现在,使用此代码,我们可以运行应用程序(播放运行),打开localhost:9000来启动应用程序,我们应该看到类似以下输出的内容。

[info] play - Starting application default Akka system.
[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334324531424
[info] application - MSG to Queue : 1334324531424
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324531424
[info] application - MSG to Exchange : 1334324532522
[info] application - MSG to Queue : 1334324532522
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324532522
[info] application - MSG to Exchange : 1334324533622
[info] application - MSG to Queue : 1334324533622
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324533622
[info] application - MSG to Exchange : 1334324534722
[info] application - MSG to Queue : 1334324534722
[info] application - Recieved on queue callback 1: MSG to Queue : 1334324534722
[info] application - MSG to Exchange : 1334324535822
[info] application - MSG to Queue : 1334324535822
[info] application - Recieved on queue callback 2: MSG to Queue : 1334324535822

在这里,您可以清楚地看到循环处理消息的方式。

设置发布和订阅方案

一旦运行了上述代码,添加发布/订阅功能就变得非常简单。 现在我们使用PublishingActor代替SendingActor:

class PublishingActor(channel: Channel, exchange: String) extends Actor {/*** When we receive a message we sent it using the configured channel*/def receive = {case some: String => {val msg = (some + " : " + System.currentTimeMillis());channel.basicPublish(exchange, "", null, msg.getBytes());Logger.info(msg);}case _ => {}}
}

RabbitMQ使用交换来允许多个收件人接收相同的消息(以及许多其他高级功能)。 来自其他参与者的代码唯一的变化是,这次我们将消息发送到交换机而不是队列。 侦听器代码完全相同,我们唯一需要做的就是将队列连接到特定的交换机。 这样,该队列上的侦听器就可以接收发送到交换机的消息。 我们再次根据之前使用的设置方法执行此操作。

...// create a new sending channel on which we declare the exchangeval sendingChannel2 = connection.createChannel();sendingChannel2.exchangeDeclare(Config.RABBITMQ_EXCHANGEE, "fanout");// define the two callbacks for our listenersval callback3 = (x: String) => Logger.info("Recieved on exchange callback 3: " + x);val callback4 = (x: String) => Logger.info("Recieved on exchange callback 4: " + x);// create a channel for the listener and setup the first listenerval listenChannel1 = connection.createChannel();setupListener(listenChannel1,listenChannel1.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback3);// create another channel for a listener and setup the second listenerval listenChannel2 = connection.createChannel();setupListener(listenChannel2,listenChannel2.queueDeclare().getQueue(), Config.RABBITMQ_EXCHANGEE, callback4);// create an actor that is invoked every two seconds after a delay of// two seconds with the message "msg"Akka.system.scheduler.schedule(2 seconds, 1 seconds, Akka.system.actorOf(Props(new PublishingActor(channel = sendingChannel2, exchange = Config.RABBITMQ_EXCHANGEE))), "MSG to Exchange");...

我们还为setupListener创建了一个重载方法,该方法作为一个附加参数,也接受要使用的交换的名称。

private def setupListener(channel: Channel, queueName : String, exchange: String, f: (String) => Any) {channel.queueBind(queueName, exchange, "");Akka.system.scheduler.scheduleOnce(2 seconds, Akka.system.actorOf(Props(new ListeningActor(channel, queueName, f))), "");}

在这小段代码中,您可以看到我们将提供的队列(在我们的示例中是一个随机名称)绑定到指定的交易所。 之后,我们将创建一个新的监听器,如我们之前所见。
现在运行此代码将产生以下输出:

[info] play - Application started (Dev)
[info] application - MSG to Exchange : 1334325448907
[info] application - MSG to Queue : 1334325448907
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325448907
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325448907
[info] application - MSG to Exchange : 1334325450006
[info] application - MSG to Queue : 1334325450006
[info] application - Recieved on exchange callback 4: MSG to Exchange : 1334325450006
[info] application - Recieved on exchange callback 3: MSG to Exchange : 1334325450006

如您所见,在这种情况下,两个侦听器都收到相同的消息。 这几乎涵盖了本文的全部内容。 如您所见,为RabbitMQ使用基于Java的客户端api绰绰有余,并且可以从Scala轻松使用。 请注意,尽管该示例尚未准备好投入生产,但您应注意关闭连接,并很好地关闭侦听器和参与者。 这里没有显示所有关闭代码。

参考:从Smart Java博客的JCG合作伙伴 Jos Dirksen 使用Scala,Play和Akka连接到RabbitMQ(AMQP) 。

翻译自: https://www.javacodegeeks.com/2012/04/connect-to-rabbitmq-amqp-using-scala.html

scala akka

scala akka_使用Scala,Play和Akka连接到RabbitMQ(AMQP)相关推荐

  1. 使用Scala,Play和Akka连接到RabbitMQ(AMQP)

    在本文中,我们将研究如何从Scala连接到RabbitMQ,以便可以从应用程序中支持AMQP协议. 在此示例中,我将使用Play Framework 2.0作为容器(有关更多信息,请参阅我在该主题上的 ...

  2. Scala 简介 [摘自 Scala程序设计 ]

    Scala 简介 1.1 为什么选择Scala Scala 是一门满足现代软件工程师需求的语言:它是一门静态类型语言,支持混合范式:它也是一门运行在 JVM 之上的语言,语法简洁.优雅.灵活.Scal ...

  3. 熟悉scala命令,scala语言运行超级素数和猴子大王

    实验目的 在Linux操作系统中安装Scala 输入"scala"命令,熟悉地运行Scala解释器 scala语言运行超级素数和猴子大王 实验仪器 Virtualbox管理器 实验 ...

  4. 【scala初学】scala symbol 符号 -3

    前面接触了scala符号,这会整体性的说说. scala符号主要分为四类:  1. 关键字,保留字 (Keywords/reserved symbols) 2. 自动导入 (Automatically ...

  5. Scala教程之:Scala基础

    文章目录 常量 变量 代码块 函数 方法 类 case类 对象 trait main方法 这篇文章我们大概过一下Scala的基础概念,后面的文章我们会有更详细的讲解Scala的具体内容. 常量 在Sc ...

  6. Scala学习(一)--Scala基础学习

    Scala基础学习 摘要: 在篇主要内容:如何把Scala当做工业级的便携计算器使用,如何用Scala处理数字以及其他算术操作.在这个过程中,我们将介绍一系列重要的Scala概念和惯用法.同时你还将学 ...

  7. Scala具体解释---------Scala是什么?可伸展的语言!

    Scala是什么 Scala语言的名称来自于"可伸展的语言". 之所以这样命名,是由于他被设计成随着使用者的需求而成长.你能够把Scala应用在非常大范围的编程任务上.从写个小脚本 ...

  8. A Scala Tutorial for Java programmers之(一)Scala入门:Scala例子,以及如何与Java交互

    本文为初学Scala的Java开发者提供了一个Scala例子(Hello world),并对Scala与Java交互的情况作了一些大致的介绍. AD: 本文源自Michel Schinz和Philip ...

  9. Scala学习--《Scala编程》

    2019独角兽企业重金招聘Python工程师标准>>> Scala学习手册--可伸缩的语言(随着使用者的需求而成长)  第一章:基本概念 Scala=FP+OO.静态语言 兼容性.简 ...

最新文章

  1. Kinesis、Streams and Firehose
  2. kali 终端真透明
  3. matlab 图片批量读取
  4. 关于Debug和Release之本质区别的讨论
  5. js(Dom+Bom)第二天(1)
  6. SG函数(hdu1847)
  7. 开发中常用的工具类(一)
  8. 英文pdf翻译为中文(word+google浏览器即可)
  9. Echarts 修改地图的标示
  10. 单片微型计算机频率测量实验,毕业论文:频率计系统设计
  11. 有趣的23000----整理(07)A词根
  12. part1:企业微信发送消息API调试
  13. 短信猫接收与发送短信整理
  14. FCN学习:Semantic Segmentation(摘自知乎)
  15. linux centos用smartctl 打开硬盘写缓存
  16. CDN网站加速的原理和流程
  17. 删除github上的一个仓库或者仓库里面的某个文件
  18. 剑灵系统推荐加点_《剑灵》各职业练级推荐修炼加点攻略
  19. FAS流控一键安装脚本
  20. 第二章 操作系统的硬件环境

热门文章

  1. Java高级进阶:自定义ClassLoader
  2. ajax面试技术回答模板
  3. publiccms实现首页菜单栏下拉的方法
  4. 人脸检测解析json的工具类face_test
  5. Redis(案例五:Set数据)
  6. Servlet 流程控制
  7. count does not exist. Check the 'Function Name Parsing and Resolution' section in the Reference Manu
  8. 打开数据库_数据库客户端navicat遇到问题怎么办?
  9. 严重: Error configuring application listener of class org.springframework.web.context.ContextLoaderLis
  10. 如何查阅相关工作所用到的文献资料