ZIO Streams and JMS

In this article we are going to explore a bit of the ZIO streams API and how it can be used to talk to a JMS broker. The ZIO web site and awesome-zio have a lot of very good articles and talks on ZIO streams covering the basics, so I won't repeat those here.

For this article I have used an embedded ActiveMQ messaging broker, but the code makes no assumptions about the underlying JMS provider. Usually the JMS broker would run externally and we would use a provider specific connection factory to connect to it.

info

The complete source code used in this article can be found on github

What we want to achieve#

Let's start by considering what a simple program sending and receiving messages from an ActiveMQ broker would look like. That program should be runnable as normal ZIO app:

private val program =
for {
_ <- putStrLn("Starting JMS Broker") *> ZIO.service[BrokerService]
conMgr <- ZIO.service[ZIOJmsConnectionManager.Service]
_ <- (for {
con <- conMgr.connect(cf, "sample")
_ <- conMgr.reconnect(con, Some(new Exception("Boom"))).schedule(Schedule.duration(10.seconds)).fork
_ <- for {
c <- consumer(con).fork
p <- producer(con).fork
_ <- c.join
_ <- p.join
} yield ()
} yield ())
} yield ()

We can see from program's type that besides the ZIO environment ZEnv we will need an ActiveMQ message broker and also the ZIO logging API to execute the program. With those requirements the program is fairly straightforward:

  1. acquire the message broker from the environment
  2. create a connection factory to talk to the broker just started
  3. create a fiber that simply sits in the background for 10 seconds
  4. use the connection factory to establish a JMS connection
  5. use the connection to kick off a sender and receiver
  6. join with the timed fiber to interrupt the sender and receiver
  7. clean up the connection
info

The sample program has a scheduled reconnect after 10 seconds. This will cause the execution to fail with an exception because the underlying streams terminate with a JMSException. In one of the next articles I will get into stream recovery.

We will dive into the various steps throughout the remainder of this article to see how ZIO is helping to provide a smooth access to JMS. First, let's see how we can run the program above:

override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = program
.provideCustomLayer(combinedEnv)
.catchAllCause(c => putStrLn(c.prettyPrint))
.exitCode

As we can see, we need to provide the environment with all required services, so that the program can actually execute.

Constructing the Environment#

First, we create a layer which consist of the standard ZIO environment enriched with the Slf4j implementation of the Logging service. This layer is required by the ActiveMQ service implementation and also by the program itself.

Next we create the ActiveMQ service using vertical composition with the logEnv and also an instance of a ZIOJmsConnectionManager which we stick into the environment as well.

Finally we can create the combinedEnv using horizontal composition of the logging, the broker layer and the connection manager layer.

The resulting environments contains everything to run our program.

private val logEnv: ZLayer[Any, Nothing, ZEnv with Logging] =
ZEnv.live ++ Slf4jLogger.make((_, message) => message)
private val brokerEnv: ZLayer[Any, Throwable, AMQBroker.AMQBroker] =
logEnv >>> AMQBroker.simple("simple")
private val mgrEnv = ZIOJmsConnectionManager.Service.make
private val combinedEnv =
logEnv ++ brokerEnv ++ mgrEnv

For reference, the Active MQ broker service is here.

Sending messages#

To have some data travelling the message broker, let's start by creating a plain ZIO stream, which emits a String element every half second or so. Each element is simply the current time formatted using a SimpleDateFormat.

private val stream: ZStream[ZEnv, Nothing, String] = ZStream
.fromSchedule(Schedule.spaced(500.millis).jittered)
.mapM(_ =>
currentTime(TimeUnit.MILLISECONDS)
.map(sdf.format)
)

Now we want to send all these Strings to JMS and later on receive them.

private def producer(con: JmsConnection) =
createSession(con).use(session => createProducer(session).use(prod => stream.run(jmsSink(prod, testDest))))

The send is broken down into these steps:

  1. create the JMS session
  2. create the JMS MessageProducer
  3. use the producer to create a ZIO Sink
  4. run the stream with the sink, so that the generated messages are sent to JMS
info

Within the JMS API we use a number of case classes that are simple wrappers around the underlying JMS classes. Essentially these case classes contain some additional information besides the original object. For one, we keep a reference of the instance that was used as a factory. I.e. the JmsSession has a reference to the connection it belongs to and a JmsConsumer a reference to the JmsSession it was created for.

Besides that all classes contain a human readable identifier, which is mainly used to produce a more readable log.

Having this in mind, we create a named JMS session as a ZManaged, so that ZIO takes care of closing the session after it has been used. Within the session we create a JmsProducer, which is again a ZManaged.

Note, that the producer does not actually produce the messages in the sense of JMS - it just has all the information to do so.

So, let's have a look how we can define an effect using the producer to actually send a message:

def send(
text: String,
prod: JmsProducer,
dest: JmsDestination
) = (for {
msg <- effectBlocking(prod.session.session.createTextMessage(text))
d <- dest.create(prod.session)
_ <- effectBlocking(prod.producer.send(d, msg))
_ <- log.debug(s"Message [$text] sent successfully with [$prod] to [${dest.asString}]")
} yield ()).flatMapError { t =>
log.warn(s"Error sending message with [$prod] to [$dest]: [${t.getMessage()}]") *> ZIO.succeed(t)
}.refineOrDie { case t: JMSException => t }
  1. use the producer's session to create the JMS Message object
  2. use the producer's session to create the JMS Destination object
  3. perform the JMS send
  4. record the send in the log

Now that we have the effect sending a single message, we can easily create a sink. The ZSink object in the ZIO streams API has a foreach method, which allows us to create a sink from an effect:

// in ZSink:
def foreach[R, E, I](f: I => ZIO[R, E, Any]): ZSink[R, E, I, I, Unit]

Let's take a moment to digest the signature: R and E is the usual type magic within ZIO to describe the environment required for the sink and the errors it may produce. Unit in that case means that the final result after running a Stream with this Sink is Unit. In other words, the Stream is just run for the effect passed in as a parameter.

The function provided needs to create an effect for each element of type I.

In our case we already have the effect, which is the send method above, so we can define our sink as

def jmsSink(
prod: JmsProducer,
dest: JmsDestination
) =
ZSink.foreach[ZEnv with Logging with ZIOJmsConnectionManager, JMSException, String](s => send(s, prod, dest))

Receiving messages#

Now let's understand the consumer of our program:

private def consumer(con: JmsConnection) =
createSession(con).use { session =>
createConsumer(session, testDest).use { cons =>
jmsStream(cons).collect { case m: TextMessage => m.getText() }
.foreach(s => putStrLn(s))
}
}

Again, we need to create a JmsSession, but this time we use it to create a JmsConsumer for the given JmsDestination. We then use the created consumer to create a ZStream[R, E, Message], in other words a plain ZStream of JMS Message objects. From that stream we collect all TextMessage instances, get the String body of those and print that to the console.

Creating the stream is again amazingly simple with the ZIO streams API:

def jmsStream(cons: JmsConsumer) =
ZStream.repeatEffect(receive(cons)).collect { case Some(s) => s }

We repeat an effect producing optional Message objects (optional because the underlying receive yields None if no message is available for the consumer). As we are only interested in the actual messages, we collect only the results actually having a Message.

The actual consume is simply a wrapper around the JMS API with some logging:

def receive(cons: JmsConsumer) = (for {
msg <- effectBlocking(Option(cons.consumer.receiveNoWait()))
_ <- if (msg.isDefined) log.debug(s"Received [$msg] with [$cons]") else ZIO.unit
} yield msg).flatMapError { t =>
log.warn(s"Error receiving message with [$cons] : [${t.getMessage()}]") *> ZIO.succeed(t)
}.refineOrDie { case t: JMSException => t }

Next steps#

I have not elaborated too much on the underlying ZIOJmsConnectionManager. Essentially this is a map to hold named connection factories. At the moment the connection manager guarantees that for a given id only one physical JMS connection is created. Also, the connection manager will provide automated reconnects and connection monitoring, which we will look at in later articles.

Next I will explore how to enhance the demo program with some resilience. The idea here is to create self healing streams that would automatically reconnect after a connection has been lost.

For now I have just used Strings as message objects; this will be enhanced to more flexible and useful messages.

Conclusion#

With ZIO it is very straight forward to break down a given problem into smaller pieces and then use those as building blocks for the solution. The challenge for developer like me is to get the head around the signatures and how all the types play together.

For example, building the environment took me a couple of hours to get it right. One has to take the time to read and understand the compiler errors. In the end I found that these errors pretty much tell me what I need to put together in terms of horizontal and vertical composition to get it right.

The first steps in some areas of ZIO require quite a bit of code study. Given that ZIO is just beyond it's first official release, the documentation and other resources are plenty, but scattered around talks and blogs. The upcoming book by John De Goes and Adam Fraser addresses a lot of that and already has a lot of content in it's alpha version.

Even if it means stating the obvious: Also when you work with a more sophisticated API in ZIO in the inner layers you are going to find ZIO effects which you then combine into something else - in our case create ZStream and ZSink from plain ZIO's and then have the entire ZStream magic at the tip of your fingers to manipulate JMS streams.