Message routers

The first step in using Arteria is to create the primary and secondary routers. Regardless of calling them with such names, both routers are actually equal and perform identically. The only difference is in the channel identity numbering scheme to avoid conflicts. Before a router can be instantiated, we need to provide a MessageRouterHandler for it. The router handler takes care of application specific activity such as instantiating new channels under the router.

The MessageRouterHandler[MaterializeChild] takes a type parameter describing the type to deliver channel materialization information to the counterpart router. Typically this is represented as a sealed hierarchy of case classes where each class corresponds to a type of channel that can be created. In this documentation we will be using two top level channels, log and UI, so our type hierarchy looks like this:

sealed abstract class RouterMessage

case object CreateLogChannel extends RouterMessage
case object CreateUIChannel extends RouterMessage

object RouterMessage {
  val defaultRouterPickler = compositePickler[RouterMessage]

We also define a pickler for the router messages by utilizing Boopickle's compositePickler.

Next we'll define the handler with a minimal implementation.

import arteria.core._

class TopChannelHandler extends MessageRouterHandler[RouterMessage] {
  override def materializeChildChannel(id: Int,
                                       globalId: Int,
                                       router: MessageRouterBase,
                                       materializeChild: RouterMessage,
                                       contextReader: ChannelReader): MessageChannelBase = {
    materializeChild match {
      case CreateLogChannel => ??? // todo
      case CreateUIChannel     => ??? // todo

Now we are ready to instantiate the router.

val handlerPri = new TopChannelHandler
val routerPri = new MessageRouter(handlerPri, isPrimary = true)

On the other side of the fence we'll do the same for the secondary router

val handlerSec = new TopChannelHandler
val routerSec = new MessageRouter(handlerSec, isPrimary = false)


As stated in the introduction, Arteria core doesn't provide any transport mechanism of its own, the routers simply produce and consume ByteBuffers. In this simplified example we just pass these buffers directly from one router to the other. To get a stream of pending messages, call the flush method and to pass this stream to the other router, use the receive method.

val dataPri = routerPri.flush()
val dataSec = routerSec.flush()

The first thing routers do is to send an EstablishRoute message to the other router, establishing the channel between them and making sure they agree on who's the primary router.

The transport implementation can check for pending messages by calling router's hasPending method before calling flush. Even when there are no pending messages, a call to flush will produce a valid ByteBuffer that can be sent to the other router (albeit containing no messages). Alternatively the application can listen to the messagesPending callback in the router handler, which is called after a message has been queued to be flushed. In typical usage scenarios it makes sense to periodically check for pending messages either by having a constant timer running or by starting a (short) timer after a callback to messagesPending is received. This improves performance as messages sent in a short period of time are kept together and flushed to the other router as a single stream.

results matching ""

    No results matching ""