Looking at Windows Azure Service Bus Message Pump Programming Model

In the April release of the Windows Azure SDK Microsoft introduced some new features for the Azure Service Bus. The new feature I want to discus in this post is the Message Pump programming model.

The Message Pump model allows us to get away from the infinite loop you have to use to receive messages from a queue. The following code snippet shows such an approach.

QueueClient.Receive Sample

There are two things to note here, one the Receive method has a default timeout which is 60 seconds. This means that within one billing month you would access the queue approximate 45.000 times. If you have a worker role on Windows Azure and you had two instances of it you would now access the queue about 90,000 times. Please keep in mind that these numbers are based on the fact that we have not received any messages yet then those numbers would increase.

Why is this important you say? Well, with your Windows Azure subscription you get 10.000 messages per billing cycle included, after that you pay $0,10 per 10.000 messages. This doesn’t seem a lot of money but it adds up if you process a lot of messages and have multiple instances running.

In my little snippet above I have passed in a TimeSpan which instructs the Receive operation to timeout after 1 hour. If a message comes in before the timeout occurs the receive method will return right away with the BrokeredMessage.

The second thing to keep in mind is that when you run a process in a worker role on Windows Azure, you should never ever leave the while loop. The reason being is that if this was your main processing loop and you would exit from it for any reason you could end up with a Server instance which is now in a recycle mode and this makes it difficult to RDP into if you wanted to diagnose the problem.

Message Pump

The new SDK gives us two new methods called OnMessage and OnMessageAsync accessible from the QueueClient class. The OnMessage method, I’ll focus on this one for now, has two overloads. Both overloads take a callback function as a first parameter while the second overload accepts a second parameter of type OnMessageOptions. The code snippet below shows a sample of the first overload.

QueueClient.OnMessage Sample

The OnMessage method registers the callback which is executed every time a message arrives on the queue. You cannot execute this method a second time or you will get an InvalidOperationException with the message “OnMessage/OnMessageAsync has already been called.”

The use of OnMessageOptions

The OnMessageOptions class allows you to specify some options associated with message pump processing. The two properties, AutoComplete and MaxConcurrentCalls are self explanatory. The ExceptionReceived allows you to register an event handler for when exceptions occur, so you think.

OnMessageOptions

What the ExceptionReceived eventhandler captures

You probably think that the ExceptionReceived event handler captures errors which occur during the processing of the message. You are right of course but it captures a little more than that. As far as I can tell it captures events from three sources:

  • Exceptions raised during the time that the message pump is waiting for messages to arrive on the service bus
  • Exceptions raised during the time that your code is processing the BrokeredMessage
  • And, it is raised when the receive process successfully completes.

This last one is weird to me and I would like to see a separate event handler for it with eventargs specifying which message was successfully completed. Currently, when a message is successfully completed, the sender nor the eventargs object reveal any info about the processed message.

In the code snippet below you find a sample of the LogErrors event handler. I have added the most common exceptions may want to deal with.

LogErrors Sample Implementation

Please note, when you have the option “AutoComplete” set to true and an exception is raised, the message will be placed back on the queue. However, if you catch the exception within your message handling process, the queue will not be notified of the exception and as soon as you end your message processing code block, the message will be marked as completed as it was not captured by the ExceptionReceived event handler.

If you want full control of when a message should be marked as completed you should set the AutoComplete to false.

When to expect Exceptions

This is not specific to the use of the Message Pump programming model but I think it is worth mentioning. There are two moments you should be aware of when Exceptions can occur; the first moment is when you are in the process of setting up your QueueClient and register your OnMessage event handler. The Exceptions listed in the LogErrors code snippet can also occur during your setup phase.

The second moment is when the OnMessage event handler has been registered and is waiting for messages to arrive. The good news is that the OnMessage event handler process is very fault tolerant and will recover from deleted queue’s, lost connections and locked queues.

Locked Queues?

Also added to the V2.0 SDK is the ability to suspend sending and receiving messages to and from queues and topics.  In the code snippet below you can see the four possible settings. These status properties allow you to set the queue on hold for receiving messages while you load a batch of message into the queue. When done, you release the queue and all messages are now picked up by the receiving processing instances.

Once you disable a queue, the OnMessage event handler receives a MessagingEntityDisabledException. The OnMessage event handler will continue to monitor when the queue is available all the while throwing those exceptions.

Queue Status

Sample Source Code

I created two small console applications which demonstrate the use of the new Message Pump programming model. The one console app posts 100 messages on your service bus queue all the while locking the queue for receiving.

The second console app monitors the queue and as soon as the queue is available and messages appear in the queue, simple displays the message sequence number and timestamp.

If you play a bit with the MaxConcurrentCalls property on the OnMessageOptions object you will see that you are able to squeeze a few more messages into one second of processing time.

Before you can use the two sample apps, you must first adjust the servicebus connection string in both app.config files.

As always, the Sample Code is not meant to be used as production ready solution but just as learning tool.

Update: As noted by Markus Oberacker it does seem that the messagepump model isn’t much of a help when trying to save costs on making servicebus requests.

7 Thoughts on “Looking at Windows Azure Service Bus Message Pump Programming Model

  1. Oliver Tomlinson on May 22, 2013 at 7:36 am said:

    Hello Wouter, thank you for writing this article, it was very helpful.

    I understand your reasoning from going from Receive() to Receive(TimeSpan.FromHours(1)) Is to save on azure costs, as this effectively reduces the frequency of queue polling on an empty queue.

    Is it necessary/possible to make a similar change to save on costs when using the Message Pump mechanism?

    Many thanks,

    Oliver

    • Hi Oliver,

      It wouldn’t be necessary as it only returns a message when there actually is one. Also, there is no option to set a timeout on the message pump and I haven’t been able to find any proof that the message pump internally works with some sort of polling mechanism which would require a timeout setting.

      So, cost wise, the message pump model seems to be the best way to go.

      • Markus Oberacker on October 18, 2013 at 5:23 am said:

        Hello Wouter,

        sorry but the message pump model doesn’t seem to be the best way to reduce the costs… because:

        “When calling OnMessage(), the client starts an internal message pump that constantly polls the queue or subscription. This message pump consists of an infinite loop that issues a Receive() call. If the call times out, it issues the next Receive() call. The timeout is the value of the OperationTimeout property of the MessagingFactory that is used. The default value of this timeout is 60 seconds. You can modify the timeout by changing the OperationTimeout property before creating the MessagingFactory. Note that every receive operation is a billable event.”

        => http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx

  2. Oliver Tomlinson on May 23, 2013 at 11:47 am said:

    Great, I was hoping it would work as you have explained it! Thanks for that.

    How does moving to the message pump affect the main processing loop?

    I take it that you are supposed to move the message pump outside of the processing loop, but maintain the process loop, even if it doesn’t do anything inside it?

    Wouldn’t spinning the while(true){} degrade the performance of processing the messages?

    • Oliver Tomlinson on May 23, 2013 at 11:51 am said:

      Sorry, I should have specified that I’m talking about the processing loop in the context of a Worker Role?

    • Hi Oliver,

      Your main processing loop is always required, it more or less prevents your role from recycling.
      The message pump should be setup and maintained outside your main processing loop but verified in side the loop. In the loop you can check if the message pump is not in a faulty state and if it is you can reestablish the connection or do whatever is needed to continue processing.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Post Navigation