Wednesday
May022012

Using F# Agent For Concurrent Updates on Entities 

Using concurrent processing techniques there will be times multiple threads will update an Entity at the same time.  This update could be on the database level, in memory, or over the network; when there are multiple thread updating entity, bad things can happen.   The typical use case when multiple threads are updated a database record is to throw a Concurrency exception to the user.  Bad UX notwithstanding there are situations where this is not possible. 

Imagine a Use Case where a server ‘process’ is processing large sets of data, and concurrency techniques where used to improve performance.  In this scenario there are a few options when threads collide on an Entity; stop the process and go home, catch the exception and retry. 

At times I feel the first option of stopping the process and going home is the best…

The second option of catching the exception and retrying can introduce more complicated issues.  For example; if there were 5 threads updating an Entity, one thread will win and 4 will get the exception and be forced to retry.   In some rare cases there might be a scenario where two threads deadlock on trying to access the Entity. 

Another solution to this problem is to have all ‘work’ on an Entity be done synchronously.  This trick is for this solution is to have a Thread Safe Queue in which we can en-queue work for an Entity and have it processed in order. 

MailboxProcessor

MSDN Doc

The MailboxProcessor otherwise known as an 'Agent' is a nice abstraction over a queue. The Agent allows different threads to post items into an the queue and using F# Async Workflows each item is processed synchronously.

With ‘Agents' the construction of a solution is fairly straightforward.  In the code block below I have constructed a class which will accept an Id for an Entity and an ‘Action’.  For each Entity it will process each ‘Action’ in sequence. 

Below is a review of the code block.

Line 11

There is an ‘Agent’ controls access to the Dictionary of Agents.   This will insure that only one thread with be adding/reading from the dictionary at a time.

Line 19

So for each Entity there will be an Agent controlling the synchronous execution of ‘Action’.  We can do this for decent number of Entities as the Agents do not require a lot of resources.

F# Code Block 

 1: open System
 2: open System.Collections.Generic
 3: 
 4: type internal AggregateAction = 
 5:     {
 6:         Id: Guid
 7:         Action: Action
 8:     }
 9: 
10: type AggregateAgent() =
11:     let agent = MailboxProcessor<AggregateAction>.Start(fun inbox ->
12:         let dic = new Dictionary<Guid,MailboxProcessor<Action>>()
13:         async{                                                                                                  
14:                 while true do
15:                     let! msg = inbox.Receive()
16:                     if dic.ContainsKey(msg.Id) then 
17:                         dic.Item(msg.Id).Post msg.Action
18:                     else
19:                         let aggAgent = MailboxProcessor<Action>.Start( fun inbox ->
20:                             async{
21:                                 while true do
22:                                     let! action = inbox.Receive()
23:                                     action.Invoke() |> ignore     
24:                             })
25:                         aggAgent.Post msg.Action
26:                         dic.Add(msg.Id,aggAgent)
27:         })
28:         
29:     member this.Process id action = 
30:         agent.Post { Id = id; Action = action}
31: 
namespace System
namespace System.Collections
namespace System.Collections.Generic
type internal AggregateAction =
  {Id: Guid;
   Action: Action;}

Full name: Snippet.AggregateAction

  type: AggregateAction
  implements: IEquatable<AggregateAction>
  implements: Collections.IStructuralEquatable
AggregateAction.Id: Guid
type Guid =
  struct
    new : System.Byte [] -> System.Guid
    new : uint32 * uint16 * uint16 * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte -> System.Guid
    new : int * int16 * int16 * System.Byte [] -> System.Guid
    new : int * int16 * int16 * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte * System.Byte -> System.Guid
    new : string -> System.Guid
    member CompareTo : obj -> int
    member CompareTo : System.Guid -> int
    member Equals : obj -> bool
    member Equals : System.Guid -> bool
    member GetHashCode : unit -> int
    member ToByteArray : unit -> System.Byte []
    member ToString : unit -> string
    member ToString : string -> string
    member ToString : string * System.IFormatProvider -> string
    static val Empty : System.Guid
    static member NewGuid : unit -> System.Guid
    static member Parse : string -> System.Guid
    static member ParseExact : string * string -> System.Guid
    static member TryParse : string * System.Guid -> bool
    static member TryParseExact : string * string * System.Guid -> bool
  end

Full name: System.Guid

  type: Guid
  implements: IFormattable
  implements: IComparable
  implements: IComparable<Guid>
  implements: IEquatable<Guid>
  inherits: ValueType
Multiple items
AggregateAction.Action: Action

--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'T16> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 * 'T16 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'T16>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 -> unit

Full name: System.Action<_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 -> unit

Full name: System.Action<_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 -> unit

Full name: System.Action<_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 -> unit

Full name: System.Action<_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3> =
  delegate of 'T1 * 'T2 * 'T3 -> unit

Full name: System.Action<_,_,_>

  type: Action<'T1,'T2,'T3>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2> =
  delegate of 'T1 * 'T2 -> unit

Full name: System.Action<_,_>

  type: Action<'T1,'T2>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T> =
  delegate of 'T -> unit

Full name: System.Action<_>

  type: Action<'T>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action =
  delegate of unit -> unit

Full name: System.Action

  type: Action
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> 'T15 -> 'T16 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> 'T15 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> unit)

--------------------

Action('T1 -> 'T2 -> unit)

--------------------

Action('T -> unit)

--------------------

Action(unit -> unit)
type AggregateAgent =
  class
    new : unit -> AggregateAgent
    member Process : id:Guid -> action:Action -> unit
  end

Full name: Snippet.AggregateAgent
val agent : MailboxProcessor<AggregateAction>

  type: MailboxProcessor<AggregateAction>
  implements: IDisposable
type MailboxProcessor<'Msg> =
  class
    interface IDisposable
    new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
    member Post : message:'Msg -> unit
    member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
    member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
    member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
    member Receive : ?timeout:int -> Async<'Msg>
    member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
    member Start : unit -> unit
    member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
    member TryReceive : ?timeout:int -> Async<'Msg option>
    member TryScan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T option>
    member add_Error : Handler<Exception> -> unit
    member CurrentQueueLength : int
    member DefaultTimeout : int
    member Error : IEvent<Exception>
    member remove_Error : Handler<Exception> -> unit
    member DefaultTimeout : int with set
    static member Start : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:Threading.CancellationToken -> MailboxProcessor<'Msg>
  end

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

  type: MailboxProcessor<'Msg>
  implements: IDisposable
val inbox : MailboxProcessor<AggregateAction>

  type: MailboxProcessor<AggregateAction>
  implements: IDisposable
val dic : Dictionary<Guid,MailboxProcessor<Action>>

  type: Dictionary<Guid,MailboxProcessor<Action>>
  implements: IDictionary<Guid,MailboxProcessor<Action>>
  implements: ICollection<KeyValuePair<Guid,MailboxProcessor<Action>>>
  implements: seq<KeyValuePair<Guid,MailboxProcessor<Action>>>
  implements: Collections.IDictionary
  implements: Collections.ICollection
  implements: Collections.IEnumerable
  implements: Runtime.Serialization.ISerializable
  implements: Runtime.Serialization.IDeserializationCallback
type Dictionary<'TKey,'TValue> =
  class
    new : unit -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    new : int -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    new : System.Collections.Generic.IEqualityComparer<'TKey> -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    new : int * System.Collections.Generic.IEqualityComparer<'TKey> -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    new : System.Collections.Generic.IDictionary<'TKey,'TValue> -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    new : System.Collections.Generic.IDictionary<'TKey,'TValue> * System.Collections.Generic.IEqualityComparer<'TKey> -> System.Collections.Generic.Dictionary<'TKey,'TValue>
    member Add : 'TKey * 'TValue -> unit
    member Clear : unit -> unit
    member Comparer : System.Collections.Generic.IEqualityComparer<'TKey>
    member ContainsKey : 'TKey -> bool
    member ContainsValue : 'TValue -> bool
    member Count : int
    member GetEnumerator : unit -> Enumerator<'TKey,'TValue>
    member GetObjectData : System.Runtime.Serialization.SerializationInfo * System.Runtime.Serialization.StreamingContext -> unit
    member Item : 'TKey -> 'TValue with get, set
    member Keys : KeyCollection<'TKey,'TValue>
    member OnDeserialization : obj -> unit
    member Remove : 'TKey -> bool
    member TryGetValue : 'TKey * 'TValue -> bool
    member Values : ValueCollection<'TKey,'TValue>
    type Enumerator =
      struct
        member Current : System.Collections.Generic.KeyValuePair<'TKey,'TValue>
        member Dispose : unit -> unit
        member MoveNext : unit -> bool
      end
    type KeyCollection =
      class
        new : System.Collections.Generic.Dictionary<'TKey,'TValue> -> KeyCollection
        member CopyTo : 'TKey [] * int -> unit
        member Count : int
        member GetEnumerator : unit -> Enumerator<'TKey,'TValue>
        type Enumerator =
          struct
            member Current : 'TKey
            member Dispose : unit -> unit
            member MoveNext : unit -> bool
          end
      end
    type ValueCollection =
      class
        new : System.Collections.Generic.Dictionary<'TKey,'TValue> -> ValueCollection
        member CopyTo : 'TValue [] * int -> unit
        member Count : int
        member GetEnumerator : unit -> Enumerator<'TKey,'TValue>
        type Enumerator =
          struct
            member Current : 'TValue
            member Dispose : unit -> unit
            member MoveNext : unit -> bool
          end
      end
  end

Full name: System.Collections.Generic.Dictionary<_,_>

  type: Dictionary<'TKey,'TValue>
  implements: IDictionary<'TKey,'TValue>
  implements: ICollection<KeyValuePair<'TKey,'TValue>>
  implements: seq<KeyValuePair<'TKey,'TValue>>
  implements: Collections.IDictionary
  implements: Collections.ICollection
  implements: Collections.IEnumerable
  implements: Runtime.Serialization.ISerializable
  implements: Runtime.Serialization.IDeserializationCallback
Multiple items
type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'T16> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 * 'T16 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15,'T16>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 * 'T15 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14,'T15>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 * 'T14 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13,'T14>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 * 'T13 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12,'T13>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 * 'T12 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11,'T12>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 * 'T11 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10,'T11>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 * 'T10 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9,'T10>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 * 'T9 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8,'T9>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 * 'T8 -> unit

Full name: System.Action<_,_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7,'T8>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 * 'T7 -> unit

Full name: System.Action<_,_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6,'T7>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5,'T6> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 * 'T6 -> unit

Full name: System.Action<_,_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5,'T6>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4,'T5> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 * 'T5 -> unit

Full name: System.Action<_,_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4,'T5>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3,'T4> =
  delegate of 'T1 * 'T2 * 'T3 * 'T4 -> unit

Full name: System.Action<_,_,_,_>

  type: Action<'T1,'T2,'T3,'T4>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2,'T3> =
  delegate of 'T1 * 'T2 * 'T3 -> unit

Full name: System.Action<_,_,_>

  type: Action<'T1,'T2,'T3>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T1,'T2> =
  delegate of 'T1 * 'T2 -> unit

Full name: System.Action<_,_>

  type: Action<'T1,'T2>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action<'T> =
  delegate of 'T -> unit

Full name: System.Action<_>

  type: Action<'T>
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

type Action =
  delegate of unit -> unit

Full name: System.Action

  type: Action
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate


--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> 'T15 -> 'T16 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> 'T15 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> 'T14 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> 'T13 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> 'T12 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> 'T11 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> 'T10 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> 'T9 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> 'T8 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> 'T7 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> 'T6 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> 'T5 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> 'T4 -> unit)

--------------------

Action('T1 -> 'T2 -> 'T3 -> unit)

--------------------

Action('T1 -> 'T2 -> unit)

--------------------

Action('T -> unit)

--------------------

Action(unit -> unit)
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
val msg : AggregateAction

  type: AggregateAction
  implements: IEquatable<AggregateAction>
  implements: Collections.IStructuralEquatable
member MailboxProcessor.Receive : ?timeout:int -> Async<'Msg>
Dictionary.ContainsKey(key: Guid) : bool
property Dictionary.Item: Guid -> MailboxProcessor<Action>
val aggAgent : MailboxProcessor<Action>

  type: MailboxProcessor<Action>
  implements: IDisposable
val inbox : MailboxProcessor<Action>

  type: MailboxProcessor<Action>
  implements: IDisposable
val action : Action

  type: Action
  implements: ICloneable
  implements: Runtime.Serialization.ISerializable
  inherits: MulticastDelegate
  inherits: Delegate
val ignore : 'T -> unit

Full name: Microsoft.FSharp.Core.Operators.ignore
member MailboxProcessor.Post : message:'Msg -> unit
AggregateAction.Action: Action
Dictionary.Add(key: Guid, value: MailboxProcessor<Action>) : unit
val this : AggregateAgent
member AggregateAgent.Process : id:Guid -> action:Action -> unit

Full name: Snippet.AggregateAgent.Process
val id : Guid

  type: Guid
  implements: IFormattable
  implements: IComparable
  implements: IComparable<Guid>
  implements: IEquatable<Guid>
  inherits: ValueType

 

One thing to note, each ‘Action’ type being processed should handle any errors it encounters as the Aggregate Agent will not know how to proceed. 

Thursday
Jul142011

Linq Query Expression Syntax with RIA Domain Context

After discussing my previous post with Philip Freeman, he suggested another good example would be to show how to use Linq Query Expression Syntax with RIA Domain Context calls. To help facilitate this example I created two new Extension Methods; LoadEntityWithRx and LoadEntitiesWithRx .

Load Entity With Rx

   1: public static IObservable<T> LoadEntityWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<T>(ob =>
   4:     {
   5:         context.Load(query, ob.ReturnEntity, null);
   6:         return () =>
   7:         {
   8:           // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

Here we are only interested in a single Entity returned by the Load operation of the domain context. This Extension Method follows the same pattern as the ones on the previous blog post. The Load operation is called (line 5) on the Domain Context, and the work is done by the ReturnEntity method. Because nothing needs to be done on unsubscription, an empty Action is returned.

Return Entity

   1: private static void ReturnEntity<T>(this IObserver<T> ob, LoadOperation<T> loadOp) where T : Entity
   2:      {
   3:          if (loadOp.HasError)
   4:          {
   5:              loadOp.MarkErrorAsHandled();
   6:              ob.OnError(loadOp.Error);
   7:          }
   8:          else
   9:          {
  10:              if (loadOp.Entities.Count() != 1)
  11:              {
  12:                  ob.OnError(new Exception("More then one entity was returned"));
  13:              }
  14:              else
  15:              {
  16:                  ob.OnNext(loadOp.Entities.SingleOrDefault());
  17:              }
  18:              ob.OnCompleted();
  19:          }
  20:      }

This is where the work is done; we either pass the exception along to the Observer or pass the Entity along. More work should be done to handle missing Entities (line 10); here I assume there will be exactly one Entity returned, which is not always the case.

Another good side effect of using the Extension Methods is you can consolidate your Error Handling for Domain Service calls.

Load Entities With Rx

   1: public static IObservable<IEnumerable<T>> LoadEntitiesWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<IEnumerable<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.ReturnEntities , null);      
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

The same pattern is followed with the LoadEntityWithRx Extension Method. The work is being done with the ReturnEntities method.

Return Entities

   1: private static void ReturnEntities<T>(this IObserver<IEnumerable<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {                
  10:         ob.OnNext(loadOp.Entities);
  11:         ob.OnCompleted();
  12:     }
  13: }

A similar pattern is followed here as well, where Exceptions are passed to the Observer, and the OnNext method is called to pass along the Entities.

Query Expression Syntax Example

 

In this example I have created are really bad model for Customer and Orders.

   1: public class Customer
   2: {
   3:     [Key]
   4:     public Guid Id { get; set; }
   5:  
   6:     public string Name { get; set; }
   7: }
   8:  
   9: public class Order
  10: {
  11:     [Key]
  12:     public Guid Id { get; set; }
  13:  
  14:     public Guid CustomerId { get; set; }
  15:  
  16:     public string Desc { get; set; }
  17: }

Good Example:

If you want to Load an Entity i.e. a Customer and then you want to take data from that Entity i.e. Customer Id, and Load another Entity the nested callbacks can get really ugly (example below). I will show how it works with Linq Query Expression Syntax, and then I will show how it would be done normally.

   1: private void LoadCustomer()
   2: {
   3:     CustomerDomainContext context = new CustomerDomainContext();
   4:  
   5:     EntityQuery<Customer> customerQuery = context.GetCustomersQuery().Where(c=>c.Name == "Fred");
   6:  
   7:     var customerOrders = from customer in context.LoadEntityWithRx(customerQuery)
   8:                          let orderQuery = context.GetOrdersQuery().Where(o => o.CustomerId == customer.Id)
   9:                          from orders in context.LoadEntitiesWithRx(orderQuery)
  10:                          select new { Customer = customer, Orders = orders };
  11:  
  12:     customerOrders.Subscribe((custOrders) => 
  13:     { 
  14:         // do cool stuff
  15:     });                                 
  16: }

In the example above you can reason about what the code is doing. First query on user name of “Fred” (Lind 5) then get the Customers Id (Line 8), then Query on the Orders matching that Customer Id (Line 9). Finally it is returning to the subscriber as an Anonymous Type containing the Customer object and its Orders (Line 10).

Bad Example:

   1: private void LoadCustomersBad()
   2: {
   3:     CustomerDomainContext context = new CustomerDomainContext();
   4:     EntityQuery<Customer> customerQuery = context.GetCustomersQuery().Where(c => c.Name == "Fred");
   5:  
   6:     context.Load(customerQuery, (loadOp) => 
   7:     {
   8:         if (!loadOp.HasError)
   9:         {
  10:             if (loadOp.Entities.Count() <= 1)
  11:             {
  12:                 Customer customer = loadOp.Entities.Single();
  13:                 EntityQuery<Order> orderQuery = context.GetOrdersQuery().Where(o => o.CustomerId == customer.Id);
  14:                 context.Load(orderQuery, (loadOpOrder) => 
  15:                 {
  16:                     if (!loadOpOrder.HasError)
  17:                     {
  18:                         IEnumerable<Order> orders = loadOpOrder.Entities;                                
  19:                         // do cool stuff
  20:                     }
  21:                 }, null);
  22:             }
  23:         }
  24:     }, null);
  25: }

Here the same result is accomplished, but it is so much harder to reason about the code. Imagine if you had to maintain this code. I have seen a lot of production code where the developer used 5+ nested callbacks; not fun. Maintaining the first code sample will be so much easier.

The same code is being called, but with some well-placed Extension Methods we can use the power and ease of Linq Query Expression Syntax to make code more maintainable.

Wednesday
Jul132011

RIA Domain Context and Reactive Extensions

One of the nice things about Rx is it’s Composability. It is really convenient to be able to compose different types of events. The Observable class has FromAsyncPattern and FromEvent methods which make it easy to create Observables from those patterns. However Rx does not provide any method to create Observables for RIA Domain Context calls. Phillip Freeman showed me some Extension Methods he created to make Observables from the Load and Submit operations on a Domain Context. I will show how they work and can be used.

 

Load With Rx

   1: public static IObservable<LoadOperation<T>> LoadWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<LoadOperation<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.Return, null);
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

This part is fairly self-explanatory, we call the Load method on the Domain Context and pass in the Entity Query. The interesting stuff happened in the Return method. Because this is not creating a stream of events, once the Load Operation is handled there is nothing to do on unsubscription so an empty action is returned.

Return

   1: private static void Return<T>(this IObserver<LoadOperation<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(loadOp);
  11:         ob.OnCompleted();
  12:     }
  13: }

 

This is where we handle the Load Operation. If there was an error in the Load Operation, we marked it as handled.  We return the Exception to the Observer so the subscriber can handle it.

If there was no error, we return the Load Operation object with the OnNext call, then signal the end of the event stream by calling OnCompleted.

Submit With Rx

   1: public static IObservable<SubmitOperation> SubmitWithRx(this DomainContext context)
   2: {
   3:     return Observable.Create<SubmitOperation>(ob =>
   4:     {
   5:         if (context.HasChanges)
   6:         {
   7:             context.SubmitChanges(ob.ReturnSubmit, null);
   8:         }
   9:         else
  10:         {
  11:             ob.OnCompleted();
  12:         }
  13:  
  14:         return () =>
  15:         {
  16:             // On unsubscription, do nothing
  17:         };
  18:     });
  19: }

This Extension Method on the Domain Context works in a similar was as the Load Extension Method. If there are no changes in the Domain Context, we signal the Observer that this is the end of the event stream. As there is nothing to do when unsubscribing an empty Action is returned. The interesting bits happen on the ReturnSubmit method.

Return Submit

   1: private static void ReturnSubmit(this IObserver<SubmitOperation> ob, SubmitOperation op)
   2: {
   3:     if (op.HasError)
   4:     {
   5:         op.MarkErrorAsHandled();
   6:         ob.OnError(op.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(op);
  11:         ob.OnCompleted();
  12:     }
  13: }

If there was an error on the Submit Operation, it is marked as handled and the Exception is sent to the Subscriber through the OnError method. If the Domain Context Submit Operation worked, then the OnNext method is called giving the subscriber the Submit Operation object. The OnCompleted method is called to signal the Observer that this is the end of the event stream.

Example

One of the reasons one would want to wrap Domain Context calls in Observables is you can use them with other Rx Extension Methods. For example; if you wanted the Domain Context to load several Entities but you wanted to wait for all of them to be done before you proceeded. The Rx Extension Method Fork Join works great for this.

   1: ctx = new WidgetDomainContext();
   2: EntityQuery<Widget> query = ctx.GetWidgetsQuery();
   3:             
   4: var load1 = ctx.LoadWithRx<Widget>(query);
   5: var load2 = ctx.LoadWithRx<Widget>(query);      
   6:       
   7: Observable.ForkJoin(load1, load2).Subscribe();

You can imagine the extra state you would have to keep around to accomplished the same functionality without Rx.

Friday
Apr292011

Taking Cues from Azure

In this blog post I will talk about how I took cues from Azure Queue Storage to solve a problem.

I was tasked to support receiving messages (data) from an external partner, parsing the content and sending it to our internal services, acting like a gateway.  I also am taking our response and sending it back to our partners. 

A simple web service could have handled the job, but as each message represents a lot of potential money, any message loss would be a big problem.

Below is a overview of the architecture I implemented to solve this problem.  

 

Windows Azure Architecture Guide

After reviewing the patterns and practices sample projects on Azure, one of the patterns I like is how Azure Queue’s are utilized to handle work in the cloud. Azure Queue Storage is used to create a producer / consumer architecture. Such that many servers could write to the Queue and one or many Compute Worker Roles de-queue message and process them.

I wanted to see if I could implement a similar technique without using Azure Queue Storage, but using Stored Procedures on a local SQL Server.

 

Table Queue

The first step was to implement the Azure Queue interface from WAAG. For the Azure Queue implementation I used stored procedures on a SQL server as the backing store. I will call my implementation Table Queue. The trick for the Table Queue was to mimic the functionality where one message at a time could be de-queued, given many consumers. I also had to allow for a built-in timeout for messages which took too long to process. For example, if a message was de-queued but not deleted after a set time period the message should be available for another consumer. The Table Queue has the following columns.

  • Processing Date (date time)
  • Processed Date (date time)
  • DeQueue Count (int)
  • Created Date (date time)
  • Version (time stamp)

I used a Stored Procedure and the NoLock attribute, to keep other consumers from selecting the same message at the same time.

If the DeQueue count reached a limit, the Queue Handler would treat the message as a Poisoned and remove the message from the Queue.

Each time a messages is Pop’d from the queue, the Processing Date is set.  The SP knows to ignore messages with a processing date within a limit of GETUTCDATE() so that once the Processing Date is set, this message is ignored for a set period of time.

Here is the Table Queue Interface

   1: public interface IQueue<T>
   2: {
   3:     Guid AddMessage(QueueMessage<T> message);
   4:     QueueMessage<T> GetMessage();
   5:     IEnumerable<QueueMessage<T>> GetMessages(int maxMessagesToReturn);
   6:     void DeleteMessage(QueueMessage<T> message);
   7:     int Size();
   8: }

Besides making it generic, I had the Add Message method return the Guid of the message added.  This was to support updated meta data in the database, that I did not want the Table Queue to implement. 

Here is the implementation of the GetMessages

   1: public IEnumerable<QueueMessage<T>> GetMessages(int maxMessagesToReturn)
   2:       {
   3:           using (IDataReader reader = db.ExecuteReader("spMSQ_Queue_POP", new object[] { marshaler.QueueType, delay.TotalSeconds, maxMessagesToReturn }))
   4:           {
   5:               while (reader.Read())
   6:               {
   7:                   QueueMessage<T> message = new QueueMessage<T>();
   8:  
   9:                   message.DeQueueCount = reader.GetInt32(reader.GetOrdinal("DeQueueCount"));
  10:                   message.Id = reader.GetGuid(reader.GetOrdinal("ID"));
  11:                   message.ContentId = reader.GetGuid(reader.GetOrdinal("MessageID"));
  12:                   int index = reader.GetOrdinal("Row_version");
  13:                   byte[] outByte = new byte[8];
  14:                   reader.GetBytes(index, 0, outByte, 0, 8);
  15:                   message.Version = Convert.ToBase64String((byte[])outByte);
  16:  
  17:                   index = reader.GetOrdinal("Message");
  18:                   string xml = string.Empty;
  19:  
  20:                   if (!reader.IsDBNull(index))
  21:                   {
  22:                       xml = reader.GetString(index);
  23:                   }
  24:                   message.Content = marshaler.MarshalContent(message.Id, xml);
  25:                   yield return message;
  26:               }
  27:           }
  28:       }

I am using a marshaler passed in through the constructor to serialize the data from the database into a type.  This separates the Marshaling of objects from the Table Queue implementation.

 

Marshaling Types

Below in the interface for Marshaling content.

   1: public interface IMarshaler<T>
   2: {
   3:     T MarshalContent(Guid id, string content);
   4:     string UnmarshalContent(Guid id, T content);
   5:     string QueueType { get; }   
   6: }

I had to figure out how to save my types. With Azure Queue, types are serialized as JSON objects.  I started of by serializing the message content to xml and sting in the db.  With my implementation I needed meta data to be saved with each type for routing.  The problem was the content types were sealed to me, so I could not add properties to them. When a worker processed each message it needed meta data to help route the message.  I could have wrapped the types, but I ended up keeping some of the meta data in a table joined to the queue table. This was not ideal as I wanted to keep the implementation free of domain information. Having the meta data in the table made debugging much quicker. So I sacrificed my pure implementation on the altar of ‘getting things done’

By passing in the marshaling methods to the Table Queue, I split the difference.  The Table Queue does not know about Marshaling.

 

Queue Message

Below is the generic Queue Message

   1: public class QueueMessage<T>
   2: {
   3:     public Guid Id { get; set; }      
   4:     public string Version { get; set; }
   5:     public int DeQueueCount { get; set; }
   6:     public string QueueType { get; set; }
   7:     public Guid ContentId { get; set; }
   8:     public T Content { get; set; }        
   9: }

It is very similar to the Azure Queue message.  The major addition is of the ContentId.  This was to make it easier to update meta data about the content.

 

Queue Handlers

I started off using the Generic Queue Handler and the Queue Handler from WAAG.  I modified the handlers to be generic. 

I added a method to the Queue Handler called Take, which allowed the caller to specify the number of messages to process at a time.

   1: public QueueHandler<T> Take(int batchSize)
   2: {
   3:     this.batchSize = batchSize;
   4:     return this;
   5: }

 

Implementation (the consumer)

   1: TableQueue<DomainType> queue = new TableQueue<DomainType>(domainTypeMarshaler, TimeSpan.FromMinutes(2));
   2: DomainTypeCommand commandDomainType = new DomainTypeCommand();
   3: QueueHandler.For(queue).Take(1).Every(TimeSpan.FromSeconds(30)).Do(commandDomainType );

 

Above is how everything is put together.  The Table Queue is typed by the DomainType.  The DomainTypeCommand will process each message.  The QueueHandler will take a Table Queue, Take 1 message and call the DomainTypeCommand instance with the message data.  It will pause 30 secods between processing each message. 

In the constructor of the Table Queue, the is Marshaler passed, and how long each message will be reserved for.  In this example it is two minutes. 

This code will be put into a Window Service, and the parameters can be passed in a configuration file.

 

Thoughts on MSMQ

I also thought about using MSMQ to accomplish the same functionality. Some of my messages could be over 4mb in size, so this would cause some problems as the size limit for MSMQ is 4mb.  Each message is written to the database before it is processed.  It acts like a built-in log. With a MSMQ implementation I would have to add this functionality which is already built into this architecture.

Poison Messages

The poison message implementation is already implemented with the Table Queue. I maintain a de-queue count and retry a message if the de-queue count is within a limit. With a MSMQ implementation I would need to maintain a retry queue for messages which failed.

Here is a SO question on this topic.

 

Final Thoughts

I had a lot of fun building this architecture, and in the back of my mind I felt I over engineered the solution to message passing. While this project is being QA’d the service I am passing messages to stopped responding.  It is not clear why, it could have been too much load, or a Chaos Monkey.  The message eventually went through.  This validated the architecture for me, as I did not have to add special handling for hardware failures.

The external partner has the potential of sending hundreds of messages a minute at peak times and each message could take 30 seconds to process intenally. I need to be able to throttle the input to our internal servers while also responding to our external partner in a timely manner.

This producer consumer architecture supported the requirements of being responsive to our external partner.

Because of the producer / consumer architecture, if our internal load becomes too much to support the incoming data, we can add more consumers.

In the future I would like to add a deliberate Chaos Monkey into the system.

I would like the thank Phillip Freeman who helped with some refactoring of my Queue Handlers.