In this article we are going to explore a bit of the ZIO streams API and how it can be used to talk to a JMS broker. The ZIO web site and awesome-zio have a lot of very good articles and talks on ZIO streams covering the basics, so I won't repeat those here.
For this article I have used an embedded ActiveMQ messaging broker, but the code makes no assumptions about the underlying JMS provider. Usually the JMS broker would run externally and we would use a provider specific connection factory to connect to it.
The complete source code used in this article can be found on github
Let's start by considering what a simple program sending and receiving messages from an ActiveMQ broker would look like. That program should be runnable as normal ZIO app:
We can see from
program's type that besides the ZIO environment
ZEnv we will need an ActiveMQ message broker and also the ZIO logging API to execute the program. With those requirements the program is fairly straightforward:
- acquire the message broker from the environment
- create a connection factory to talk to the broker just started
- create a fiber that simply sits in the background for 10 seconds
- use the connection factory to establish a JMS connection
- use the connection to kick off a sender and receiver
- join with the timed fiber to interrupt the sender and receiver
- clean up the connection
The sample program has a scheduled
reconnect after 10 seconds. This will cause the execution to fail with an exception because the underlying streams terminate with a
JMSException. In one of the next articles I will get into stream recovery.
We will dive into the various steps throughout the remainder of this article to see how ZIO is helping to provide a smooth access to JMS. First, let's see how we can run the program above:
As we can see, we need to provide the environment with all required services, so that the
program can actually execute.
First, we create a layer which consist of the standard ZIO environment enriched with the Slf4j implementation of the
Logging service. This layer is required by the ActiveMQ service implementation and also by the program itself.
Next we create the ActiveMQ service using vertical composition with the
logEnv and also an instance of a
ZIOJmsConnectionManager which we stick into the environment as well.
Finally we can create the
combinedEnv using horizontal composition of the logging, the broker layer and the connection manager layer.
The resulting environments contains everything to run our program.
For reference, the Active MQ broker service is here.
To have some data travelling the message broker, let's start by creating a plain ZIO stream, which emits a String element every half second or so. Each element is simply the current time formatted using a
Now we want to send all these Strings to JMS and later on receive them.
The send is broken down into these steps:
- create the JMS session
- create the JMS MessageProducer
- use the producer to create a ZIO Sink
- run the stream with the sink, so that the generated messages are sent to JMS
Within the JMS API we use a number of case classes that are simple wrappers around the underlying JMS classes. Essentially these case classes contain some additional information besides the original object. For one, we keep a reference of the instance that was used as a factory. I.e. the
JmsSession has a reference to the connection it belongs to and a
JmsConsumer a reference to the
JmsSession it was created for.
Besides that all classes contain a human readable identifier, which is mainly used to produce a more readable log.
Having this in mind, we create a named JMS session as a
ZManaged, so that ZIO takes care of closing the session after it has been used. Within the session we create a
JmsProducer, which is again a
Note, that the producer does not actually produce the messages in the sense of JMS - it just has all the information to do so.
So, let's have a look how we can define an effect using the producer to actually send a message:
- use the producer's session to create the JMS
- use the producer's session to create the JMS
- perform the JMS send
- record the send in the log
Now that we have the effect sending a single message, we can easily create a sink. The
ZSink object in the ZIO streams API has a
foreach method, which allows us to create a sink from an effect:
Let's take a moment to digest the signature:
E is the usual type magic within ZIO to describe the environment required for the sink and the errors it may produce.
Unit in that case means that the final result after running a Stream with this Sink is
Unit. In other words, the Stream is just run for the effect passed in as a parameter.
The function provided needs to create an effect for each element of type
In our case we already have the effect, which is the
send method above, so we can define our sink as
Now let's understand the
consumer of our program:
Again, we need to create a
JmsSession, but this time we use it to create a
JmsConsumer for the given
We then use the created consumer to create a
ZStream[R, E, Message], in other words a plain
ZStream of JMS
Message objects. From that stream we collect all
TextMessage instances, get the
String body of those and print that to the console.
Creating the stream is again amazingly simple with the ZIO streams API:
We repeat an effect producing optional
Message objects (optional because the underlying receive yields
None if no message is available for the consumer). As we are only interested in the actual messages, we collect only the results actually having a
The actual consume is simply a wrapper around the JMS API with some logging:
I have not elaborated too much on the underlying
ZIOJmsConnectionManager. Essentially this is a map to hold named connection factories. At the moment the connection manager guarantees that for a given id only one physical JMS connection is created. Also, the connection manager will provide automated reconnects and connection monitoring, which we will look at in later articles.
Next I will explore how to enhance the demo program with some resilience. The idea here is to create self healing streams that would automatically reconnect after a connection has been lost.
For now I have just used Strings as message objects; this will be enhanced to more flexible and useful messages.
With ZIO it is very straight forward to break down a given problem into smaller pieces and then use those as building blocks for the solution. The challenge for developer like me is to get the head around the signatures and how all the types play together.
For example, building the environment took me a couple of hours to get it right. One has to take the time to read and understand the compiler errors. In the end I found that these errors pretty much tell me what I need to put together in terms of horizontal and vertical composition to get it right.
The first steps in some areas of ZIO require quite a bit of code study. Given that ZIO is just beyond it's first official release, the documentation and other resources are plenty, but scattered around talks and blogs. The upcoming book by John De Goes and Adam Fraser addresses a lot of that and already has a lot of content in it's alpha version.
Even if it means stating the obvious: Also when you work with a more sophisticated API in ZIO in the inner layers you are going to find ZIO effects which you then combine into something else - in our case create
ZSink from plain ZIO's and then have the entire ZStream magic at the tip of your fingers to manipulate JMS streams.