Entries in Azure (1)

Friday
Apr292011

Taking Cues from Azure

In this blog post I will talk about how I took cues from Azure Queue Storage to solve a problem.

I was tasked to support receiving messages (data) from an external partner, parsing the content and sending it to our internal services, acting like a gateway.  I also am taking our response and sending it back to our partners. 

A simple web service could have handled the job, but as each message represents a lot of potential money, any message loss would be a big problem.

Below is a overview of the architecture I implemented to solve this problem.  

 

Windows Azure Architecture Guide

After reviewing the patterns and practices sample projects on Azure, one of the patterns I like is how Azure Queue’s are utilized to handle work in the cloud. Azure Queue Storage is used to create a producer / consumer architecture. Such that many servers could write to the Queue and one or many Compute Worker Roles de-queue message and process them.

I wanted to see if I could implement a similar technique without using Azure Queue Storage, but using Stored Procedures on a local SQL Server.

 

Table Queue

The first step was to implement the Azure Queue interface from WAAG. For the Azure Queue implementation I used stored procedures on a SQL server as the backing store. I will call my implementation Table Queue. The trick for the Table Queue was to mimic the functionality where one message at a time could be de-queued, given many consumers. I also had to allow for a built-in timeout for messages which took too long to process. For example, if a message was de-queued but not deleted after a set time period the message should be available for another consumer. The Table Queue has the following columns.

  • Processing Date (date time)
  • Processed Date (date time)
  • DeQueue Count (int)
  • Created Date (date time)
  • Version (time stamp)

I used a Stored Procedure and the NoLock attribute, to keep other consumers from selecting the same message at the same time.

If the DeQueue count reached a limit, the Queue Handler would treat the message as a Poisoned and remove the message from the Queue.

Each time a messages is Pop’d from the queue, the Processing Date is set.  The SP knows to ignore messages with a processing date within a limit of GETUTCDATE() so that once the Processing Date is set, this message is ignored for a set period of time.

Here is the Table Queue Interface

   1: public interface IQueue<T>
   2: {
   3:     Guid AddMessage(QueueMessage<T> message);
   4:     QueueMessage<T> GetMessage();
   5:     IEnumerable<QueueMessage<T>> GetMessages(int maxMessagesToReturn);
   6:     void DeleteMessage(QueueMessage<T> message);
   7:     int Size();
   8: }

Besides making it generic, I had the Add Message method return the Guid of the message added.  This was to support updated meta data in the database, that I did not want the Table Queue to implement. 

Here is the implementation of the GetMessages

   1: public IEnumerable<QueueMessage<T>> GetMessages(int maxMessagesToReturn)
   2:       {
   3:           using (IDataReader reader = db.ExecuteReader("spMSQ_Queue_POP", new object[] { marshaler.QueueType, delay.TotalSeconds, maxMessagesToReturn }))
   4:           {
   5:               while (reader.Read())
   6:               {
   7:                   QueueMessage<T> message = new QueueMessage<T>();
   8:  
   9:                   message.DeQueueCount = reader.GetInt32(reader.GetOrdinal("DeQueueCount"));
  10:                   message.Id = reader.GetGuid(reader.GetOrdinal("ID"));
  11:                   message.ContentId = reader.GetGuid(reader.GetOrdinal("MessageID"));
  12:                   int index = reader.GetOrdinal("Row_version");
  13:                   byte[] outByte = new byte[8];
  14:                   reader.GetBytes(index, 0, outByte, 0, 8);
  15:                   message.Version = Convert.ToBase64String((byte[])outByte);
  16:  
  17:                   index = reader.GetOrdinal("Message");
  18:                   string xml = string.Empty;
  19:  
  20:                   if (!reader.IsDBNull(index))
  21:                   {
  22:                       xml = reader.GetString(index);
  23:                   }
  24:                   message.Content = marshaler.MarshalContent(message.Id, xml);
  25:                   yield return message;
  26:               }
  27:           }
  28:       }

I am using a marshaler passed in through the constructor to serialize the data from the database into a type.  This separates the Marshaling of objects from the Table Queue implementation.

 

Marshaling Types

Below in the interface for Marshaling content.

   1: public interface IMarshaler<T>
   2: {
   3:     T MarshalContent(Guid id, string content);
   4:     string UnmarshalContent(Guid id, T content);
   5:     string QueueType { get; }   
   6: }

I had to figure out how to save my types. With Azure Queue, types are serialized as JSON objects.  I started of by serializing the message content to xml and sting in the db.  With my implementation I needed meta data to be saved with each type for routing.  The problem was the content types were sealed to me, so I could not add properties to them. When a worker processed each message it needed meta data to help route the message.  I could have wrapped the types, but I ended up keeping some of the meta data in a table joined to the queue table. This was not ideal as I wanted to keep the implementation free of domain information. Having the meta data in the table made debugging much quicker. So I sacrificed my pure implementation on the altar of ‘getting things done’

By passing in the marshaling methods to the Table Queue, I split the difference.  The Table Queue does not know about Marshaling.

 

Queue Message

Below is the generic Queue Message

   1: public class QueueMessage<T>
   2: {
   3:     public Guid Id { get; set; }      
   4:     public string Version { get; set; }
   5:     public int DeQueueCount { get; set; }
   6:     public string QueueType { get; set; }
   7:     public Guid ContentId { get; set; }
   8:     public T Content { get; set; }        
   9: }

It is very similar to the Azure Queue message.  The major addition is of the ContentId.  This was to make it easier to update meta data about the content.

 

Queue Handlers

I started off using the Generic Queue Handler and the Queue Handler from WAAG.  I modified the handlers to be generic. 

I added a method to the Queue Handler called Take, which allowed the caller to specify the number of messages to process at a time.

   1: public QueueHandler<T> Take(int batchSize)
   2: {
   3:     this.batchSize = batchSize;
   4:     return this;
   5: }

 

Implementation (the consumer)

   1: TableQueue<DomainType> queue = new TableQueue<DomainType>(domainTypeMarshaler, TimeSpan.FromMinutes(2));
   2: DomainTypeCommand commandDomainType = new DomainTypeCommand();
   3: QueueHandler.For(queue).Take(1).Every(TimeSpan.FromSeconds(30)).Do(commandDomainType );

 

Above is how everything is put together.  The Table Queue is typed by the DomainType.  The DomainTypeCommand will process each message.  The QueueHandler will take a Table Queue, Take 1 message and call the DomainTypeCommand instance with the message data.  It will pause 30 secods between processing each message. 

In the constructor of the Table Queue, the is Marshaler passed, and how long each message will be reserved for.  In this example it is two minutes. 

This code will be put into a Window Service, and the parameters can be passed in a configuration file.

 

Thoughts on MSMQ

I also thought about using MSMQ to accomplish the same functionality. Some of my messages could be over 4mb in size, so this would cause some problems as the size limit for MSMQ is 4mb.  Each message is written to the database before it is processed.  It acts like a built-in log. With a MSMQ implementation I would have to add this functionality which is already built into this architecture.

Poison Messages

The poison message implementation is already implemented with the Table Queue. I maintain a de-queue count and retry a message if the de-queue count is within a limit. With a MSMQ implementation I would need to maintain a retry queue for messages which failed.

Here is a SO question on this topic.

 

Final Thoughts

I had a lot of fun building this architecture, and in the back of my mind I felt I over engineered the solution to message passing. While this project is being QA’d the service I am passing messages to stopped responding.  It is not clear why, it could have been too much load, or a Chaos Monkey.  The message eventually went through.  This validated the architecture for me, as I did not have to add special handling for hardware failures.

The external partner has the potential of sending hundreds of messages a minute at peak times and each message could take 30 seconds to process intenally. I need to be able to throttle the input to our internal servers while also responding to our external partner in a timely manner.

This producer consumer architecture supported the requirements of being responsive to our external partner.

Because of the producer / consumer architecture, if our internal load becomes too much to support the incoming data, we can add more consumers.

In the future I would like to add a deliberate Chaos Monkey into the system.

I would like the thank Phillip Freeman who helped with some refactoring of my Queue Handlers.