Recovery for (JMS) Streams
Today we are going to explore the ZIO streams API and see how we can create a stream that will will enter a recovery phase in certain error scenarios. We will explore how we can deal with exceptions on the connection level while keeping the stream processing code untouched.
#
Automatically recover (JMS) streamsIn my last article I have shown how the ZIO stream API allows us to easily create streams for sending or receiving messages via JMS. Within the sample program we have seen that the streams terminate with an exception whenever the underlying JMS API raises encounters an error.
One of the most common errors is that the connection is lost due to a network error. For long running applications we would like to initiate an automatic reconnect and either create a new stream or recover the existing stream. The advantage of recovering the existing stream is that we do not have to rewire the users of the streams. Any effect using the existing stream will be suspended until the reconnect has happened and then continue.
In this article I will explore how we ca use the ZIO API to achieve such a transparent reconnect.
info
The complete source code used in this article can be found on github
#
What we want to achieveLike in the last article, let's start by looking at a sample program we would like to run:
There are 2 important differences in comparison to the sample application of the last article:
- We are issuing a reconnect to the underlying connection factory. This implies that there is some mechanism within the connection factory that controls an automated reconnect.
- Rather than creating the JMS stream / sink directly we use an effect that yields a factory linked to the connection factory which can create a stream or sink for a given destination.
When we run this program with the input stream below, we will notice that the output pauses for a couple of seconds when the reconnect is triggered and then continues sending and receiving messages.
Here is an excerpt from the console output:
#
A reconnecting wrapper around the JMS Connection factoryUnder the covers we use a connection manager which manages named JMS connections and at the moment it guarantees that for a given id only a single physical JMS connections will be established. Under the covers the connection manager delegates all JMS API calls to the methods of ConnectionFactory
within the JMS API.
To guarantee that only a single connection can be established, we wrap the actual connect with a Semaphore and return the connection if it already exists, otherwise we create a new connection and store it.
To recover a connection, we perform a JMS close on the existing connect and enter a recovery period. Within that period any execution of the connect
effect will result in a JMSException
.
Finally, the reconnect
effect simply triggers the recover if an underlying connection currently exists.
#
Creating a recoverable Stream (consume messages)The idea behind the recovering stream is that we connect to the JMS broker with the given connection factory and then start consuming messages until we hit an exception. Whenever we hit an exception, we catch it and enter a recovery phase. After the recovery phase we will try to reconnect and continue to consume messages.
The idea manifests in consumeUntilException
and consumeForEver
. consumeUntilException
uses the stream we have seen in the last article. It will stick all messages that have been received into a one element buffer which we can use later on to create the final stream visible to the outside world.
consumeForever
simply creates an effect which will create the JMS connection and then delegate to consumeUntilException
. The we apply the catchAll
operator to that effect where we schedule the next iteration to consumeForEver
after a recovery period.
The final stream is then created from repeating the take
operation of the buffer while consumerForEver
is executing in it's own fiber.
#
Why a one element buffer ?One might wonder why I am using a one element buffer. We are operating on JMS and want to make sure that no messages are being lost. As a result, in a real application we have to acknowledge the message to the message broker once we are done with processing it. In case we encounter an exception while processing the message we have several options:
- We drop the message byt acknowledging even if we could not process it correctly
- We forward the message to another destination such as an error destination or a retry destination and acknowledge it
- We deny the message
The last option here is not really part of the JMS API which only has an acknowledge method on the JMS message. What we would do in a real application is use a session with CLIENT_ACKNOWLEDGE
and to deny the message we would close the receiving session. This would automatically mark the message as undelivered in the JMS broker - effectively denying the message. As this would apply to all messages that have been received within the same session and that have not been acknowledged yet, we consume the messages one by one.
We will explore the error handling further in another post.
#
Creating a recoverable Sink (send messages)The idea behind the recovering sink is pretty much the same as for the recovering stream. The subtle difference is that we do not use the sink we have seen in the last article, but a method to send a single message.
Apart from that, the pattern to create the recoverable sink is the same as for creating the stream.
#
Sample logBelow is a log excerpt of the sample app execution. Note the disconnect starting time 10770
, the recovery period of the connection and the recurring recovery attempts of the consumer and producer stream until the reconnect has finished and everything can resume from 16550
onwards.
#
ConclusionWith very little code and simple patterns we could create ZIO streams on top of JMS with automatic recovery baked in. Towards the users of the created stream or sink the recovery is completely transparent and from their perspective they are working with normal ZStream
s or ZSink
s.
#
Next stepsThe next step is to add a keep alive monitor to an established connection, which will trigger reconnects if a maximum number of keep alive messages have been missed.