Reactive streams for RabbitMQ with Monix
In Why you should know Monix I’ve taken a brief look at some of Monix’s abstractions and utilities, but I haven’t dived into implementing reactive streams elements. This time I’m going to build Consumer
and Observer
for RabbitMQ message broker.
I have to start with vocabulary, to make things understandable:
- reactive stream: stream with non-blocking back-pressure, not necessarily implementing the Reactive Streams specification
- channel: RabbitMQ channel, logical part of RabbitMQ connection, not a programming primitive for inter-process-communication
- queue: RabbitMQ queue, clients consume messages from queues, not a queue data-structure like BlockingQueue
- consume: read (get) a message from a queue
- exchange: RabbitMQ exchange, clients produce (send, publish) messages to exchanges
- produce: send (publish) a message to an exchange
During my effort I’ll stay on shoulders of giants, mainly on monix-kafka andreactive-rabbit, big “Thank You” to Alexandru Nedelcu and Michał Kiędyś!
Companion project’s source code for this post is located here.
Objective
What I aim for are: Monix Consumer
that takes messages from Observable
and produces messages to broker and Monix Observable
that consumesmessages from broker and gives them to Consumer
. For both elements I will provide tests for “reactiveness”.
What I’m not going to implement is a complete library of production quality or part of it. I’m trying to keep things very simple.
RabbitMQ
Some basic knowledge about RabbitMQ you ought to have to understand this post is:
- messages are produced (published) to
Exchanges
with somerouting key
- server routes messages to
Queues
(zero to many) accordingly to messagerouting key
and server bindings setup - messages are consumed from
Queues
and their consumption (delivery) is acknowledged by client, delivered messages will not be delivered to other consumer, unacknowledged messages re-appear inQueue
and will be delivered again to one of consumers - there are transactions and means to confirm published messages or use transactional channels, I abstract these things
- physical
Connection
is scarce resource but one can create multipleChannels
inside it - violating a protocol when using a
Channel
can cause closingConnection
- server blocks
Connection
if it produces too much, so it is good idea to have separateConnection
for consumption - documentation says that “
Channel
instances must not be shared between threads”, I understand this as “don’t accessChannel
concurrently from multiple threads”
Basic AMQP concepts page contains all necessary information, it is about 15 minutes read.
Short summary of Monix’s Observer[T], Subscriber[T] and Consumer[T, R]abstractions and their relations:
Observer[T]
has three callbacks:onNext(e: T): Future[Ack]
,onComplete(): Unit
andonError(ex: Throwable): Unit
, defines behaviour on stream events.Subscriber[T]
is anObserver[T]
withScheduler
attached.Consumer[T, R]
is a factory ofSubscribers[T, R]
- When
o: Observable[T]
is applied toc: Consumer[C, R]
,c
createss: Subscriber[C, R]
ando.subscribe(s)
happens. R
is type of element emitted whenSubscriber
completes – for example, a consumer that sums length of allString
s passed to it has typeConsumer[String, Long]
.
ExchangeConsumer
I’m implementing will expect an Observable[OutboundMessage]
and it will produce observed messages to RabbitMQ server. There is no value I want to return upon completion of upstream (data producer), thus I need to return Consumer[OutboundMessage, Unit]
. If I was going to count messages sent and signal this number when upstream completes, then type would be Consumer[OutboundMessage, Long]
.
There is more than one way to skin a cat, my way is to have one Connection
per Consumer
, this Connection
is used to create one Channel
per Subscriber
. This is cheap, but Connection
– thus all Subscriber
s using it’s Channel
s – can be blocked by server because of one Subscriber
that produces too much. Let’s live with this design decision and dive into the code:
Important points of createSubscriber
are:
- private
Channel
is created for newSubscriber
- unaltered
Scheduler
is used by newSubscriber
- I don’t care about extra AMQP properties, for demo purpose exchange, routing key and body are enough
publish
callsChannel.basicPublish
which has synchronous, blocking APIonNext
callspublish
then returnsContinue
becauseSubscriber
is ready for next element at that time- When
Observable
signalsonComplete
oronError
, givenCallback
is used after aborting aChannel
. AssignableCancelable.dummy
is returned alongSubscriber
, I’ll get back to it in canceling subscription.
That’s it. I can’t tell it was hard but Monix can make it even easier, this time I’ll create equivalent Consumer
using Consumer.fromObserver
Consumer.fromObserver
takes care of Callback[Unit]
seen in previous example, by calling it’s onSuccess
and onError
after calling Observer
’s onComplete
and onError
methods, thus Callback
is absent in this code.
It’s time for short example of how to start streaming from observable to Rabbit, please just note types that are used:
Canceling subscription
Finally I’m ready to explain AssignableCancelable.dummy
present in the first example.
Subscriber can cancel this AssignableCancelable
to cancel subscription to data source. Monix needs Assignable
part of it, because it assigns subscription to Observer
to it. In other words it allows canceling subscription to data source from data sink.
Let’s consider scenario in which RabbitMQ will close ExchangeConsumer
s Connection
, and therefore all Subscriber
s Channel
s. onNext
subsequent to channel shutdown has to fail (Stop
should be returned and onError
of Callback
should be invoked). But what if Observable
uses some scarce resource and supports canceling subscriptions properly? Then we can save this resource, by canceling subscription, when Channel
s shutdown is observed.
Thanks to Channel.addShutdownListener
improvement is very simple.
Final version of ExchangeConsumer
in repo: ExchangeConsumer.scala
There is also Consumer.create function, where Cancelable
(subscription) is injected as a parameter. I use it in ExchangeConsumer companion object.
Canceling subscriber
Implemented ExchangeSubscriber
is now polite to Observable
, because it reacts to connection closed with canceling it’s subscription. Unfortunately, in following scenario no one is so kind to ExchangeSubscriber
, it will await for subsequent onNext
calls keeping Channel
open. Forever.
Observer contract states that “The data-source can get canceled without the observer receiving any notification about it”. But there is one weird trick that can help us, it is: Observable.onCancelTriggerError
.
After little change produce
ends with java.util.concurrent.CancellationException
but I don’t care, I just told it to cancel and my resource is free!
Testing ExchangeConsumer
Ok, it compiles, so it works. I call it a day.
Just kidding, tests are missing! I’m conservative about tests but I’m also motivated to end this section with little effort.
I’m going to omit round trip tests where messages are produced by ExchangeConsumer
, because this post is more about reactive streams than about AMQP.
For tests about being reactive streams, things look good for me, because someone has written pretty nice Technology Compatibility Kit for Reactive Streams. I’ll use Monix goodness to obtain org.reactivestreams.Subscriber
from my Consumer
and TCK to perform tests!
ExchangeConsumerReactiveSubscriberSpec
Code above is missing starting RabbitMQ server (not in Repo also), creating exchange and connection management.
Relevant parts are:
createElement
– required bySubscriberBlackboxVerification
to create elements thatSubscriber
will receivecreateSubscriber
– providing an instance ofSubscriber
fromConsumer
toReactive(requestCount = 10)
– converts toorg.reactivestreams.Subscriber
which demands 10 elements in advance, before processing them;requestCount
can be considered as a size of internal buffer
That was my free lunch! At least 40% of it because only 10 of 25 specs of TCK are run by default without extra custom code. It is good beginning though!
Combination of Channel
API, Monix goodness and ReactiveStreams TCK made my first objective quite easy.
QueueObservable
It’s turn to implement Observable
that will consume from RabbitMQ queue. API gives me a choice between callback and pull. By callback API I understand using Channel.basicConsume(queue, autoAck, consumer)
, where consumer
has handful of callback methods. reactive-rabbit uses this approach.
Pulling is using Channel.basicGet(queue, autoAck)
which returns GetResponse
. basicGet
is synchronous and blocking, but if there is no message ready for consumption, it returns null
instantly – it doesn’t perform blocking await for next message. I’ll use pull.
The second design decision to make is to choose how to acknowledge messages in case of basicGet(queue, false)
is used – that is when client side decides when messages processing is done and it is OK to remove it from queue.
I’m going to make an QueueObservable
that is an Observable[AckableGetResponse]
, which closes on Channel
reference and allows to ack
message to the broker. Similar approach is used by Akka Streams connector for Apache Kafka, which returns CommittableMessage
to commit offsets.
Same type is returned regardless auto-ack is used, that is only to keep code blog-post-example short.
To implement Observable
I need to provide a function from Subscriber
to Cancelable
. Very side-effecting function in fact. It should start process of feeding given Subscriber
with elements that are observable (messages from queue in our case). Returned Cancelable
ought to allow aborting that process.
What I’m going to do with given Subscriber
is:
- Start dedicated
Channel
for it, if that step fails, callonError
- Pull next message and signal
onNext
oronError
(queue got deleted, channel got closed, etc.) - Repeat (2) unless process was canceled
- Never call it’s
onComplete
, because I treat queues as infinite observables (design decision)
Following code is more involving than previous examples, I tried to document my intentions with comments.
feeding
is a process that feeds subscriber with messages consumed from queue, with added cancel handler (sets continue
flag to false) and error handler that signals to subscriber
. I justify so trivial cancellation with synchronous nature of basicGet
and quick spinning in case of no messages ready for being consumed.
feedSubscriber
calls oneGet
if processing should continue, otherwise aborts channel and terminates processing with abort
task.
oneGet
performs one Channel.basicGet
and if there was some message consumed subscriber.onNext
is called. If subscriber wants to Continue
, feedSubscriber
is trampolined. In opposite case channel is aborted and processing is terminated by abort
task. When exception occurs, perhaps during basicGet
, Task.raiseError
is returned. It will cause onErrorRecover
of feeding
– thus subscriber.onError
.
Testing QueueObservable
No surprise for testing Observable
. I’ll use reactive streams TCK again.
Observable
has method toReactivePublisher
by which I obtain org.reactivestreams.Publisher
to run test against it.
QueueObservableReactivePublisherSpec
I ought to justify myself for skipping test for Specification Rule 1.09 Textual specification states:
Original test fails because it expects onSubscribe
and given implementation doesn’t call it. Although it doesn’t break specification because it doesn’t signal anything at all in this test case – doesn’t violate “MUST call onSubscribe (…) prior to any other signals (…)”.
Of course, more rules of specification could be checked with extra effort although I’m going to stop here.
Thank you for your attention, I hope this post is helpful!
Conclusion
In my opinion it went pretty easy. For sure there are features missing, like Publisher Confirms (for ExchangeConsumer
) or using Channel auto-recovery (for both Consumer
and Observable
), also my take on acknowledgments is very simple, so don’t I dare comparing implementation with Monix to reactive-rabbit
or any other production grade library.
I hope I demonstrated potential of creating one with Monix and provided nice example of creating Consumers
and Observers
.
Links
- Monix homepage
- RabbitMQ homepage
- Reactive Streams
- reactive-rabbit
- monix-kafka
- Akka Streams connector for Apache Kafka
- Examples for this article
Do you like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.
Read also