消息的可靠交付

Akka Actor 模型默认情况下是实现的最多一次交付,在本地环境发送消息还好,通常不会存在消息丢失的情况。但是在网络环境下,因为网络延迟等原因消息丢失的可能性大大增加。另外,发送端和接收端都有可能因为各种原因挂掉而造成消息丢失,无论本地还是跨网络环境。

通常,消息交付有三种方式: 至多一次至少一次恰好一次。对于要求消息不能丢失的应用场景,第一种 至多一次 的交付模式是肯定不可取的,那至少一次和恰好一次应该选择哪一种呢?对于 恰好一次,因为要保证一条消息不会被消费者消费多次, 那意味着生产者在向消费者发送消息之前需要先发送一条消息查询是否已被消费者使用过?而消费者在处理完消息向生产者发送确认(Confirmed) 消息后需要等待生产者返回 ConfirmedAck 消息,这无形中会显著增加系统消耗并降低吞吐量。最后来看一下 至少一次, 生产者在发送消息前不需要查询消费者是否已经处理过此条消息,消费者也不需要等待生产者返回 ConfirmedAck,这相对于 恰好一次 模式来说系统消耗将更小且吞吐量更高。 至少一次 造成的问题就是消费都可能会收到重复消息,但这在业务上可以简单地通过消息 ID 去重来解决。

总的来说, 至少一次 对于大部分要求消息不能丢失的应用场景都是合适的,而 Akka Typed 提供的消息可靠交付也实现了这一方式。

Akka Typed 从 2.6.4 开始提供了可靠交付的 预览 版实现(相对于经典 actor 的至少一次交付实现),提供了三种支持的模式应用于不同的业务场景:

  1. 点对点:两个 Actor 之间直接发送消息
  2. 工作拉取(Work Pulling):一个生产者 Actor 产生消息,多个消费者 Actor 消费消息(一个消息只被一个工作节点消费)
  3. 分片(Sharding):基于 Akka Cluster Sharding 机制在集群上自动分布 Actor,不需要手动管理 Actor 的创建、重启等工作

要使用可靠交付,需要添加下面依赖到项目:

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.4"

点对点可靠交付

Akka Typed 为点对点可靠交付抽像了生产者和消费者概念,生产者将消息发送到 ProducerController ,消费者通过 ConsumerController 接收消息。点对点可靠交付实现了至少一次交付,消费者端可能会收到重复数据。

生产者需要向 ProducerController 发送 Start (1)消息来启动它,这时生产者将收到一个 RequestNext (2)消息, 并可通过它向 ProducerController (3)发送消息。生产者只应在收到 RequestNext 消息时才向 ProducerController 发送消息, 这意味着两个 actor 之间的消息交付是一种消费者来主动拉的模式。

消费者也需要向 ConsumerController 发送 Start (6)消息来启动它,之后当消息可用时消费者会收到 Delivery (7)消息, 消费者应在消息处理完成后向生产者发送 Confirmed (8)来确认这条消息已经处理完成。若当前消息未被确认(ProducerController 收到 Confirmed 消息),消费者将不会收到下一条消息。

上图中 (4)、(5) 两个步骤将由 Akka 自动维护,开发人员不需要关心。当 ConsumerController 没有消息可用时它将向 ProducerController 发送 Request 消息请求获得消息。ProducerController 除了在收到 Request 请求之外,当发现还有未确认消息时也会主动向 ConsumerController 推送消息,比如从错误中恢复或者某个消息长时间未收到 Confirmed 确认而触发超时重发。 因为点对点可靠交付的流量控制是消费者(ConsumerController)控制的,这意味着生产者将不能发送比消费者处理能力更多的消息。

注意:生产者和 ProducerController,以及消费者和 ConsumerController 必须在同一个 ActorSystem 中,不能跨网络。

点对点可靠交付示例

示例以一个斐波那契数列计算做为示例,它有一个生产者计算从1开始到1000的斐波那契数例,消费者每次请求一个数字对应的斐波那契数值。

生产者

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.scaladsl.Behaviorsobject FibonacciProducer {sealed trait Commandprivate case class WrappedRequestNext(r: ProducerController.RequestNext[FibonacciConsumer.Command]) extends Commanddef apply(producerController: ActorRef[ProducerController.Command[FibonacciConsumer.Command]]): Behavior[Command] = {Behaviors.setup { context =>val requestNextAdapter =context.messageAdapter[ProducerController.RequestNext[FibonacciConsumer.Command]](WrappedRequestNext(_))producerController ! ProducerController.Start(requestNextAdapter)fibonacci(0, 1, 0)}}private def fibonacci(n: Long, b: BigInt, a: BigInt): Behavior[Command] = {Behaviors.receive {case (context, WrappedRequestNext(next)) =>context.log.info("Generated fibonacci {}: {}", n, a)next.sendNextTo ! FibonacciConsumer.FibonacciNumber(n, a)if (n == 1000)Behaviors.stoppedelsefibonacci(n + 1, a + b, b)}}
}

有关 Akka Typed 消息适配(context.messageAdapter)的更多内容可以参看 适配响应类型到 Actor 。

消费者

import akka.actor.typed.delivery.ConsumerControllerobject FibonacciConsumer {sealed trait Commandfinal case class FibonacciNumber(n: Long, value: BigInt) extends Commandprivate case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Commanddef apply(consumerController: ActorRef[ConsumerController.Command[FibonacciConsumer.Command]]): Behavior[Command] = {Behaviors.setup { context =>val deliveryAdapter =context.messageAdapter[ConsumerController.Delivery[FibonacciConsumer.Command]](WrappedDelivery(_))consumerController ! ConsumerController.Start(deliveryAdapter)Behaviors.receiveMessagePartial {case WrappedDelivery(ConsumerController.Delivery(FibonacciNumber(n, value), confirmTo)) =>context.log.info("Processed fibonacci {}: {}", n, value)confirmTo ! ConsumerController.ConfirmedBehaviors.same}}}
}

通过向 confirmTo 发送 ConsumerController.Confirmed 消息,ConsumerController 将向生产者发送消息(处理)确认。 ConsumerController 内部隐藏了消息ID等技术细节,开发人员不需要关心应该发送哪条消息的确认消息,这简化了 API,同时可以让代码更专注于业务逻辑。

下面代码构造了生产者、消费者以及它们分别需要的 ProducerControllerConsumerController。将生产者和消费者连接起来是通过将 producerController 作为 RegisterToProducerController 消息的内容发送到 consumerController 来实现的,consumerController 内部会实现 ProducerControllerConsumerController 的连接。

val consumerController = context.spawn(ConsumerController[FibonacciConsumer.Command](), "consumerController")
context.spawn(FibonacciConsumer(consumerController), "consumer")val producerId = s"fibonacci-${UUID.randomUUID()}"
val producerController = context.spawn(ProducerController[FibonacciConsumer.Command](producerId, durableQueueBehavior = None),"producerController")
context.spawn(FibonacciProducer(producerController), "producer")consumerController ! ConsumerController.RegisterToProducerController(producerController)

点对点交付语义

只要生产者和消费者都没崩溃,消息就会按发送到 ProducerController 的顺序发送到消费者,不会丢失也不会重复。这意味着就是恰好一次处理, 消费者端不需要任何业务级的重复判断。

如果生产者崩溃,那么未确认的消费可能会丢失。为了防止消息丢失可以启用 ProducerControllerdurableQueueBehavior , 这将在生产者端启用持久化存储,通过 Akka Persistence 把未确认消息存储到外部物理存储上,这样在从崩溃中恢复以后可以继续处理未确认消息。 这意味着至少一次交付。

如果消费者崩溃,那么一个新的 ConsumerController 可以连接到消费者。生产者将重新传递所有未确认的消息,这样消费者可能需要根据消息 ID 来判断此消息是否被处理过。这意味着至少一次交付。

工作拉取(Work Pulling)

工作拉取模式在点对点交付的基础之上提供了一对多的消息通信方式。工作拉取指多个消费者按照自己的节奏从同一个生产者拉取消息, 而生产者并不在意有多次个消费者,也不会在不知道消费者处理能力的情况下盲目的向消费者推送消息。

这里一个重要的特性是消息处理的顺序是不确定的,消息将被随机的发送到各个消费者,甚至未确认的消息在重发时也可能发送到不同的消费者上。

消息从生产者发送到 WorkPullingProducerController ,并通过 ConsumerController 到达消费者 actor。 消费者的处理逻辑对点对点交付模式中消费者处理逻辑一样。下图是一个消费者时的工作拉取模式:

下面是有两个消费者时的工作拉取模式:

消费者的 ConsumerController 通过 ServiceKey 动态注册到 WorkPullingProducerController。通过这个方式,消费者可以动态注册或删除自己到/从相同 ServiceKey 的生产者。

与点对点交付模式类似,式作拉取模式也是客户端主动拉取的,生产者需要在收到 Request 请求以后才能向 WorkPullingProducerController 发送消息。生产者与 WorkPullingProducerController 也要在同一个 ActorSystem 中,不允许跨网络,消费者亦同。生产者和消费者都需要向 XxxController 发送 Start 消息来启动它;消费者同样需要向 confirmTo 发送 ConsumerController.Confirmed 消费来告知生产者此消息 已被消费。因为生产者与多个消费者之间的流量控制由消费者驱动,意味着生产者发送的速度不会快于消费者的请求速度。

不同的地方在当多个消费者向生产者请求消息,在消息被实际发送到消费者之前消费者已经停止。这时,消息将被缓冲在生产者端,当有新的消费者注册或请求时, 缓冲的消息将被发送到消费者。

工作拉取示例

图片转换消费者(Worker)

与点对点交付模式不同,工作拉取模式下 ConsumerController 不再是通过发送 ConsumerController.RegisterToProducerController(producerController) 注册消息的方式来注册到 WorkPullingProducerController 的,而是在构造 ConsumerController 时通过 ServiceKey 来查找生产者。

import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.receptionist.ServiceKeyobject ImageConverter {sealed trait Commandfinal case class ConversionJob(resultId: UUID, fromFormat: String, toFormat: String, image: Array[Byte])private case class WrappedDelivery(d: ConsumerController.Delivery[ConversionJob]) extends Commandval serviceKey = ServiceKey[ConsumerController.Command[ConversionJob]]("ImageConverter")def apply(): Behavior[Command] = {Behaviors.setup { context =>val deliveryAdapter =context.messageAdapter[ConsumerController.Delivery[ConversionJob]](WrappedDelivery(_))val consumerController =context.spawn(ConsumerController(serviceKey), "consumerController")consumerController ! ConsumerController.Start(deliveryAdapter)Behaviors.receiveMessage {case WrappedDelivery(delivery) =>val image = delivery.message.imageval fromFormat = delivery.message.fromFormatval toFormat = delivery.message.toFormat// convert image...// store result with resultId key for later retrieval// and when completed confirmdelivery.confirmTo ! ConsumerController.ConfirmedBehaviors.same}}}}

图片转换生产者

WorkPullingProducerController 在构造时需要指定 ServiceKey,消费者同过相同的 ServiceKey 注册到生产者。

import akka.actor.typed.delivery.WorkPullingProducerControllerobject ImageWorkManager {trait Commandfinal case class Convert(fromFormat: String, toFormat: String, image: Array[Byte]) extends Commandprivate case class WrappedRequestNext(r: WorkPullingProducerController.RequestNext[ImageConverter.ConversionJob])extends Commandfinal case class GetResult(resultId: UUID, replyTo: ActorRef[Option[Array[Byte]]]) extends Commanddef apply(): Behavior[Command] = {Behaviors.setup { context =>val requestNextAdapter =context.messageAdapter[WorkPullingProducerController.RequestNext[ImageConverter.ConversionJob]](WrappedRequestNext(_))val producerController = context.spawn(WorkPullingProducerController(producerId = "workManager",workerServiceKey = ImageConverter.serviceKey,durableQueueBehavior = None),"producerController")producerController ! WorkPullingProducerController.Start(requestNextAdapter)Behaviors.withStash(1000) { stashBuffer =>new ImageWorkManager(context, stashBuffer).waitForNext()}}}}final class ImageWorkManager(context: ActorContext[ImageWorkManager.Command],stashBuffer: StashBuffer[ImageWorkManager.Command]) {import ImageWorkManager._private def waitForNext(): Behavior[Command] = {Behaviors.receiveMessage {case WrappedRequestNext(next) =>stashBuffer.unstashAll(active(next))case c: Convert =>if (stashBuffer.isFull) {context.log.warn("Too many Convert requests.")Behaviors.same} else {stashBuffer.stash(c)Behaviors.same}case GetResult(resultId, replyTo) =>// TODO retrieve the stored result and replyBehaviors.same}}private def active(next: WorkPullingProducerController.RequestNext[ImageConverter.ConversionJob]): Behavior[Command] = {Behaviors.receiveMessage {case Convert(from, to, image) =>val resultId = UUID.randomUUID()next.sendNextTo ! ImageConverter.ConversionJob(resultId, from, to, image)waitForNext()case GetResult(resultId, replyTo) =>// TODO retrieve the stored result and replyBehaviors.samecase _: WrappedRequestNext =>throw new IllegalStateException("Unexpected RequestNext")}}
}

工作拉取语义

工作拉取模式不关心消息顺序,因为每个消息会被随机的以任一路由到任何一个消费者。

只要生产者和消费者不崩溃(或因为某种原因消费者被删除),消息就会被传递到消费者,不会丢失也不会重复,不需要任何业务层面的重复判断。 这意味着至少一次交付。

如果生产者崩溃,那么未确认的消费可能会丢失。为了防止消息丢失可以启用 WorkPullingProducerControllerdurableQueueBehavior , 这将在生产者端启用持久化存储,通过 Akka Persistence 把未确认消息存储到外部物理存储上,这样在从崩溃中恢复以后可以继续处理未确认消息。 这意味着至少一次交付。

如果消费者崩溃或被停止,未确认消息将被重新传递到其它消费者。这种情况下,消息可能已经被处理过了,收到消息的消费者可能需要根据消息 ID 去重。 这意味着至少一次交付。

分片(Sharding)

与 Akka Cluster Sharding 一起使用可靠交付,需要添加下面依赖到项目:

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.6.4"

通过分片方式的生产者和消费者之间的消息可靠交付:

发送消息到另一个消费者:

从另一个生产者发送消息到消费者:

ShardingProducerController 应与 ShardingConsumerController 一起使用。

生产者可以通过 ShardingProducerController 发送消息到指定 entityIdShardingConsumerController。每个 ActorSystem(节点) 的单个 ShardingProducerController 都可以发送消息到相同实体类型(EntityTypeKey)的所有消费者实体,不需要显示的注册 ShardingConsumerControllerShardingProducerController

ShardingProducerController.RequestNext 消息中,有关于哪些实体在请求数据的信息。但是,它也允许向未包含在 RequestNext.entitiesWithDemand 里的新 entityId 发送消息,这时候这些消息将被缓冲。这意味着,可以一次向 RequestNext 发送多条消息,但建议每次只发送一条,等待下一个 RequestNext 到来后再发送其它消息。

生产者与 ShardingProducerController 也必须在同一个 ActorSystem 系统,并且不能跨网络。

每个分片实体都有一个 ShardingConsumerController,它与 ShardingProducerController 之间可以有很多未确认的消息, 但受到由消费者驱动流量控制,这意味着 ShardingProducerController 的发送速度不会超过消费者需求的速度。

分片示例

分片实体是一个 todo list 应用,它使用异步数据库调用存储自己在每次变化时的整个状态,当存储完成时首先把 confirmTo 与存储结果一起发送给 actor 自己, 再根据存储结果类型向生产者发送 Confirmed 确认消息。

消费者 TodoList 实体

trait DB {def save(id: String, value: TodoList.State): Future[Done]def load(id: String): Future[TodoList.State]
}object TodoList {sealed trait Commandfinal case class AddTask(item: String) extends Commandfinal case class CompleteTask(item: String) extends Commandprivate final case class InitialState(state: State) extends Commandprivate final case class SaveSuccess(confirmTo: ActorRef[ConsumerController.Confirmed]) extends Commandprivate final case class DBError(cause: Throwable) extends Commandprivate final case class CommandDelivery(command: Command, confirmTo: ActorRef[ConsumerController.Confirmed])extends Commandfinal case class State(tasks: Vector[String])def apply(id: String,db: DB,consumerController: ActorRef[ConsumerController.Start[Command]]): Behavior[Command] = {Behaviors.setup[Command] { context =>new TodoList(context, id, db).start(consumerController)}}}class TodoList(context: ActorContext[TodoList.Command], id: String, db: DB) {import TodoList._private def start(consumerController: ActorRef[ConsumerController.Start[Command]]): Behavior[Command] = {context.pipeToSelf(db.load(id)) {case Success(value) => InitialState(value)case Failure(cause) => DBError(cause)}Behaviors.receiveMessagePartial {case InitialState(state) =>val deliveryAdapter: ActorRef[ConsumerController.Delivery[Command]] = context.messageAdapter { delivery =>CommandDelivery(delivery.message, delivery.confirmTo)}consumerController ! ConsumerController.Start(deliveryAdapter)active(state)case DBError(cause) =>throw cause}}private def active(state: State): Behavior[Command] = {Behaviors.receiveMessagePartial {case CommandDelivery(AddTask(item), confirmTo) =>val newState = state.copy(tasks = state.tasks :+ item)save(newState, confirmTo)active(newState)case CommandDelivery(CompleteTask(item), confirmTo) =>val newState = state.copy(tasks = state.tasks.filterNot(_ == item))save(newState, confirmTo)active(newState)case SaveSuccess(confirmTo) =>confirmTo ! ConsumerController.ConfirmedBehaviors.samecase DBError(cause) =>throw cause}}private def save(newState: State, confirmTo: ActorRef[ConsumerController.Confirmed]): Unit = {context.pipeToSelf(db.save(id, newState)) {case Success(_)     => SaveSuccess(confirmTo)case Failure(cause) => DBError(cause)}}
}

注意:通过 Cluster Sharding 启动的 TodoList 需要通过发送消息(consumerController ! ConsumerController.Start(deliveryAdapter)) 来手动启动 ConsumerController

TodoService 生产者:

object TodoService {sealed trait Commandfinal case class UpdateTodo(listId: String, item: String, completed: Boolean, replyTo: ActorRef[Response])extends Commandsealed trait Responsecase object Accepted extends Responsecase object Rejected extends Responsecase object MaybeAccepted extends Responseprivate final case class WrappedRequestNext(requestNext: ShardingProducerController.RequestNext[TodoList.Command])extends Commandprivate final case class Confirmed(originalReplyTo: ActorRef[Response]) extends Commandprivate final case class TimedOut(originalReplyTo: ActorRef[Response]) extends Commanddef apply(producerController: ActorRef[ShardingProducerController.Command[TodoList.Command]]): Behavior[Command] = {Behaviors.setup { context =>new TodoService(context).start(producerController)}}}class TodoService(context: ActorContext[TodoService.Command]) {import TodoService._private implicit val askTimeout: Timeout = 5.secondsprivate def start(producerController: ActorRef[ShardingProducerController.Start[TodoList.Command]]): Behavior[Command] = {val requestNextAdapter: ActorRef[ShardingProducerController.RequestNext[TodoList.Command]] =context.messageAdapter(WrappedRequestNext.apply)producerController ! ShardingProducerController.Start(requestNextAdapter)Behaviors.receiveMessagePartial {case WrappedRequestNext(next) =>active(next)case UpdateTodo(_, _, _, replyTo) =>// not hooked up with shardingProducerController yetreplyTo ! RejectedBehaviors.same}}private def active(requestNext: ShardingProducerController.RequestNext[TodoList.Command]): Behavior[Command] = {Behaviors.receiveMessage {case WrappedRequestNext(next) =>active(next)case UpdateTodo(listId, item, completed, replyTo) =>if (requestNext.bufferedForEntitiesWithoutDemand.getOrElse(listId, 0) >= 100)replyTo ! Rejectedelse {val requestMsg = if (completed) TodoList.CompleteTask(item) else TodoList.AddTask(item)context.ask[ShardingProducerController.MessageWithConfirmation[TodoList.Command], Done](requestNext.askNextTo,askReplyTo => ShardingProducerController.MessageWithConfirmation(listId, requestMsg, askReplyTo)) {case Success(Done) => Confirmed(replyTo)case Failure(_)    => TimedOut(replyTo)}}Behaviors.samecase Confirmed(originalReplyTo) =>originalReplyTo ! AcceptedBehaviors.samecase TimedOut(originalReplyTo) =>originalReplyTo ! MaybeAcceptedBehaviors.same}}}

通过 ActorSystem 的守卫 actor 实例化 TodoList 服务的分片生产者和分片消费者。ShardingConsumerController 内部包含了 ConsumerController,通过在构造分片实体的时候传递给 TodoList actor。

object ToDoApp {private val entityTypeKey = EntityTypeKey[ConsumerController.SequencedMessage[TodoList.Command]]("toto")private def behavior(db: DB): Behavior[TodoList.Command] = Behaviors.setup { context =>val todoEntity = Entity(entityTypeKey)(entityContext =>ShardingConsumerController(start => TodoList(entityContext.entityId, db, start)))val todoRegion = ClusterSharding(context.system).init(todoEntity)val selfAddress = Cluster(context.system).selfMember.addressval producerId = s"todo-producer-${selfAddress.host}:${selfAddress.port}"val producerController =context.spawn(ShardingProducerController(producerId, todoRegion, None), "producerController")context.spawn(TodoService(producerController), "producer")Behaviors.ignore}def main(args: Array[String]): Unit = {val db = new DB {override def save(id: String, value: TodoList.State): Future[Done] = ???override def load(id: String): Future[TodoList.State] = ???}ActorSystem(behavior(db), "todo-system")}
}

分片交付语义

只要生产者和消费者都没崩溃,消息就会按发送到 ShardingProducerController 的顺序发送,不会丢失也不会重复。这意味着就是恰好一次处理, 消费者端不需要任何业务级的重复判断。

如果生产者崩溃,那么未确认消息可能丢失。为了防止消息丢失可以启用 ShardingProducerControllerdurableQueueBehavior, 这将在生产者端启用持久化存储,通过 Akka Persistence 把未确认消息存储到外部物理存储上,这样在从崩溃中恢复以后可以继续处理未确认消息。 这意味着至少一次交付。

如果消费者崩溃或分片被重新平衡,那么将重新传送未确认消息。这种情况下也许会重复处理前一个消费者已经处理过的消息。

持久的生产者

生产者在收到确认消息之前,相应的消息会一直保存在内存里以用于可能的重发。但是,如果生产者端的 JVM 崩溃,那么未确认消息将会丢失。 DurableProducerQueue 可用于确保消息在这种场景(JVM 崩溃)下也可以交付。因为未确认消息以持久的方式被存储,所以当生产者重启以后可以重新传送它们。 EventSourcedProducerQueueakka-persistence-typed 提供的 DurableProducerQueue 实现。

注意,DurableProducerQueueue 将增加大量的性能开销。

要使用 EventSourcedProducerQueue,需要添加下面的依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % "2.6.4"

要使用 EventSourcedProducerQueue 非常简单,比如前面的图片转换示例,可以按下面代码描述使用:

import akka.persistence.typed.delivery.EventSourcedProducerQueue
import akka.persistence.typed.PersistenceIdval durableQueue =EventSourcedProducerQueue[ImageConverter.ConversionJob](PersistenceId.ofUniqueId("ImageWorkManager"))
val durableProducerController = context.spawn(WorkPullingProducerController(producerId = "workManager",workerServiceKey = ImageConverter.serviceKey,durableQueueBehavior = Some(durableQueue)),"producerController")

可以看到,只需要在构建 WorkPullingProducerController 时传递一个 EventSourcedProducerQueue 的实例即可。 需要注意的是,务必确保 PersistenceId 的唯一性。

只有流量控制

可以不重发丢失的消息,但仍然使用流量控制。当生产者和消息者都在相同的本地 ActorSystem 上时,这可能有用。这样做会更高效, 因为在确认之前消息不必被保存在 ProducerController 的内存中,但缺点是丢失的消息将不被重传。可以参考 ConsumerControlleronly-flow-control 配置。

总结

Akka Typed 在版本 2.6.4 开始终于提供了可靠交付的实现(之前只有经典、非类型 actor 版本的实现):至少一次交付(At Least Once Delivery)。 可以看到,提供了可靠交付实现是基于 Akka 已有功能之上的,比如:

  • 通过 Receptionist 根据 ServiceKey 查找远程 actor;
  • 通过 Akka Cluster Sharding 实现基于分片的可靠交付,这样就不需要手动管理消费者的创建;
  • 通过 Akka Persistence 可现了未确认消息的持久存储,以保证在程序重启后仍然可以重发未确认消息。

对于 Akka Typed 现在提供的各种可靠交付模式,各有各的实现方式,下面列出各自的适用场景:

原文连接:https://www.yangbajing.me/akka-cookbook/cluster/reliable-delivery.html 。

delphi 发送网络消息_Actor 消息的可靠交付(Akka Typed)相关推荐

  1. delphi 发送网络消息_《新手学习ISO网络模型》(1)如何直观理解物理层?

    新手向,以入门为主,建立对物理层的直观理解 网络就是一组互相连接的通信设备.如何实现网络可以让两台计算机传达消息. 协议:决定两个人或两台设备交流信息都要遵守的一个规则. 我们可以通过制定自己的协议来 ...

  2. 消息队列消息丢失和消息重复发送的处理策略

    分布式事务 什么是分布式事务 我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳 ...

  3. #rabbitMQ #重复消费 #可靠投递 #延时投递 #rabbitMQ交换机类型#重复消费#消息积压#消息丢失

    exchange类型: 1, direct 指定direct后, 消息会根据你设置的routeing key(路由键), 发送到对应的队列中 1,新建direct交换机 2,添加队列, 并且绑定路由键 ...

  4. post发送byte数组_KAFKA消息发送

    消息发送的整体架构 RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能.RecordAccumulator 缓存的大小可以通 ...

  5. Java Socket发送与接收HTTP消息简单实现

    在上次Java Socket现实简单的HTTP服务我 们实现了简单的HTTP服务,它可以用来模拟HTTP服务,用它可以截获HTTP请求的原始码流,让我们很清楚的了解到我们向服务发的HTTP消息的结 构 ...

  6. 技巧: 用 JAXM 发送和接收 SOAP 消息—Java API 使许多手工生成和发送消息方面必需的步骤自动化...

    简介: 在本篇技巧文章中,作者兼开发人员 Nicholas Chase 向您演示如何使用用于 XML 消息传递的 Java API(Java API for XML Messaging (JAXM)) ...

  7. 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息

    作者 | 辽天 来源 | 阿里巴巴云原生公众号 导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 sp ...

  8. vs如何实现tcp连续发送多条消息_消息队列之 RabbitMQ

    为什么要使用MQ消息中间件?它解决了什么问题?关于为什么要使用消息中间件?消息中间件是如何做到同步变异步.流量削锋.应用解耦的?网上已经有很多说明,我这里就不再说明.我在接下来的RabbitMq系列博 ...

  9. java向微信公众号---发送模板和图文消息

    微信公众号初次开发 其他操作 项目搭建 maven application.yml yml参数配置 微信客户端配置 搭建完成 实现业务 模板消息推送 准备工作 模板消息填写要求 图文消息填写要求 推送 ...

最新文章

  1. ELMo:最好用的词向量(Deep contextualized word representations)论文 pdf
  2. 基础知识《二》java的基本类型
  3. 坐在马桶上看算法:只有五行的Floyd最短路算法
  4. geojson在线生成工具_logofree详解:LOGO设计在线生成
  5. 如何为SAP Cloud for Customer Lead页面配置自定义的Source字段
  6. 【ArcGIS风暴】最牛逼空间数据批处理神器来了:用户自定义工具箱GeoStorm.tbx
  7. oppo的sd卡在哪里打开_oppo的sd卡在哪里打开
  8. robotFramework-ride使用1-关键字驱动与数据类型
  9. 十八般武艺玩转GaussDB(DWS)性能调优:路径干预
  10. Visual Studio Code 1.48 发布
  11. 程序员和注册会计师的地位_“注册会计师和律师哪个地位高?”这3张图给出了答案!...
  12. SQL数据表字段类型与属性总结(DDL)
  13. EOS project 中 的一个 jsp 文件中 调用 javascript函数的问题
  14. Vijos P1335 数独验证【谜题】
  15. 《深入理解Java虚拟机》读书笔记二
  16. php网站简繁切换,一个独立可用的中文简体繁体转换PHP程序
  17. 读书笔记_打开量化投资的黑箱08
  18. 华为交换机恢复出厂设置的三种方法
  19. 英语语音篇 - 元音自然拼读
  20. 深度学习需要多强的数学基础?

热门文章

  1. 【岗位详情】腾讯广告机制策略算法工程师(北京)
  2. python批量读取文件内容_Python之批量读取文件【面试必学】
  3. 华为P20云文档空间满了怎么清理_原来华为手机能这样清理垃圾,怪不得别人的手机再用两年不卡顿...
  4. Leetcode每日一题:690.employee-importance(员工的重要性)
  5. Python字符串函数总结
  6. 织梦5.7生成HTML很慢,Dedecms 生成静态网页速度特别慢的问题
  7. python自动化办公 51cto_利用python实现批量自动化运维脚本案例
  8. 怎么把网页源码家入hexo博客_从零开始搭建个人博客(超详细)
  9. android ctrl 左键鼠标左键直接打开xml文件夹,android – 在listview项目上单击打开xml文件...
  10. 利用hutool实现邮件发送功能