ZIO Streams and JMS
In this article we are going to explore a bit of the ZIO streams API and how it can be used to talk to a JMS broker. The ZIO web site and awesome-zio have a lot of very good articles and talks on ZIO streams covering the basics, so I won't repeat those here.
For this article I have used an embedded ActiveMQ messaging broker, but the code makes no assumptions about the underlying JMS provider. Usually the JMS broker would run externally and we would use a provider specific connection factory to connect to it.
info
The complete source code used in this article can be found on github
#
What we want to achieveLet's start by considering what a simple program sending and receiving messages from an ActiveMQ broker would look like. That program should be runnable as normal ZIO app:
We can see from program
's type that besides the ZIO environment ZEnv
we will need an ActiveMQ message broker and also the ZIO logging API to execute the program. With those requirements the program is fairly straightforward:
- acquire the message broker from the environment
- create a connection factory to talk to the broker just started
- create a fiber that simply sits in the background for 10 seconds
- use the connection factory to establish a JMS connection
- use the connection to kick off a sender and receiver
- join with the timed fiber to interrupt the sender and receiver
- clean up the connection
info
The sample program has a scheduled reconnect
after 10 seconds. This will cause the execution to fail with an exception because the underlying streams terminate with a JMSException
. In one of the next articles I will get into stream recovery.
We will dive into the various steps throughout the remainder of this article to see how ZIO is helping to provide a smooth access to JMS. First, let's see how we can run the program above:
As we can see, we need to provide the environment with all required services, so that the program
can actually execute.
#
Constructing the EnvironmentFirst, we create a layer which consist of the standard ZIO environment enriched with the Slf4j implementation of the Logging
service. This layer is required by the ActiveMQ service implementation and also by the program itself.
Next we create the ActiveMQ service using vertical composition with the logEnv
and also an instance of a ZIOJmsConnectionManager
which we stick into the environment as well.
Finally we can create the combinedEnv
using horizontal composition of the logging, the broker layer and the connection manager layer.
The resulting environments contains everything to run our program.
For reference, the Active MQ broker service is here.
#
Sending messagesTo have some data travelling the message broker, let's start by creating a plain ZIO stream, which emits a String element every half second or so. Each element is simply the current time formatted using a SimpleDateFormat
.
Now we want to send all these Strings to JMS and later on receive them.
The send is broken down into these steps:
- create the JMS session
- create the JMS MessageProducer
- use the producer to create a ZIO Sink
- run the stream with the sink, so that the generated messages are sent to JMS
info
Within the JMS API we use a number of case classes that are simple wrappers around the underlying JMS classes. Essentially these case classes contain some additional information besides the original object. For one, we keep a reference of the instance that was used as a factory. I.e. the JmsSession
has a reference to the connection it belongs to and a JmsConsumer
a reference to the JmsSession
it was created for.
Besides that all classes contain a human readable identifier, which is mainly used to produce a more readable log.
Having this in mind, we create a named JMS session as a ZManaged
, so that ZIO takes care of closing the session after it has been used. Within the session we create a JmsProducer
, which is again a ZManaged
.
Note, that the producer does not actually produce the messages in the sense of JMS - it just has all the information to do so.
So, let's have a look how we can define an effect using the producer to actually send a message:
- use the producer's session to create the JMS
Message
object - use the producer's session to create the JMS
Destination
object - perform the JMS send
- record the send in the log
Now that we have the effect sending a single message, we can easily create a sink. The ZSink
object in the ZIO streams API has a foreach
method, which allows us to create a sink from an effect:
Let's take a moment to digest the signature: R
and E
is the usual type magic within ZIO to describe the environment required for the sink and the errors it may produce. Unit
in that case means that the final result after running a Stream with this Sink is Unit
. In other words, the Stream is just run for the effect passed in as a parameter.
The function provided needs to create an effect for each element of type I
.
In our case we already have the effect, which is the send
method above, so we can define our sink as
#
Receiving messagesNow let's understand the consumer
of our program:
Again, we need to create a JmsSession
, but this time we use it to create a JmsConsumer
for the given JmsDestination
.
We then use the created consumer to create a ZStream[R, E, Message]
, in other words a plain ZStream
of JMS Message
objects. From that stream we collect all TextMessage
instances, get the String
body of those and print that to the console.
Creating the stream is again amazingly simple with the ZIO streams API:
We repeat an effect producing optional Message
objects (optional because the underlying receive yields None
if no message is available for the consumer). As we are only interested in the actual messages, we collect only the results actually having a Message
.
The actual consume is simply a wrapper around the JMS API with some logging:
#
Next stepsI have not elaborated too much on the underlying ZIOJmsConnectionManager
. Essentially this is a map to hold named connection factories. At the moment the connection manager guarantees that for a given id only one physical JMS connection is created. Also, the connection manager will provide automated reconnects and connection monitoring, which we will look at in later articles.
Next I will explore how to enhance the demo program with some resilience. The idea here is to create self healing streams that would automatically reconnect after a connection has been lost.
For now I have just used Strings as message objects; this will be enhanced to more flexible and useful messages.
#
ConclusionWith ZIO it is very straight forward to break down a given problem into smaller pieces and then use those as building blocks for the solution. The challenge for developer like me is to get the head around the signatures and how all the types play together.
For example, building the environment took me a couple of hours to get it right. One has to take the time to read and understand the compiler errors. In the end I found that these errors pretty much tell me what I need to put together in terms of horizontal and vertical composition to get it right.
The first steps in some areas of ZIO require quite a bit of code study. Given that ZIO is just beyond it's first official release, the documentation and other resources are plenty, but scattered around talks and blogs. The upcoming book by John De Goes and Adam Fraser addresses a lot of that and already has a lot of content in it's alpha version.
Even if it means stating the obvious: Also when you work with a more sophisticated API in ZIO in the inner layers you are going to find ZIO effects which you then combine into something else - in our case create ZStream
and ZSink
from plain ZIO's and then have the entire ZStream magic at the tip of your fingers to manipulate JMS streams.