RIA Domain Context and Reactive Extensions
Wednesday, July 13, 2011 at 3:05PM
Tony Abell in RIA, Rx

One of the nice things about Rx is it’s Composability. It is really convenient to be able to compose different types of events. The Observable class has FromAsyncPattern and FromEvent methods which make it easy to create Observables from those patterns. However Rx does not provide any method to create Observables for RIA Domain Context calls. Phillip Freeman showed me some Extension Methods he created to make Observables from the Load and Submit operations on a Domain Context. I will show how they work and can be used.


Load With Rx

   1: public static IObservable<LoadOperation<T>> LoadWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<LoadOperation<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.Return, null);
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

This part is fairly self-explanatory, we call the Load method on the Domain Context and pass in the Entity Query. The interesting stuff happened in the Return method. Because this is not creating a stream of events, once the Load Operation is handled there is nothing to do on unsubscription so an empty action is returned.


   1: private static void Return<T>(this IObserver<LoadOperation<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(loadOp);
  11:         ob.OnCompleted();
  12:     }
  13: }


This is where we handle the Load Operation. If there was an error in the Load Operation, we marked it as handled.  We return the Exception to the Observer so the subscriber can handle it.

If there was no error, we return the Load Operation object with the OnNext call, then signal the end of the event stream by calling OnCompleted.

Submit With Rx

   1: public static IObservable<SubmitOperation> SubmitWithRx(this DomainContext context)
   2: {
   3:     return Observable.Create<SubmitOperation>(ob =>
   4:     {
   5:         if (context.HasChanges)
   6:         {
   7:             context.SubmitChanges(ob.ReturnSubmit, null);
   8:         }
   9:         else
  10:         {
  11:             ob.OnCompleted();
  12:         }
  14:         return () =>
  15:         {
  16:             // On unsubscription, do nothing
  17:         };
  18:     });
  19: }

This Extension Method on the Domain Context works in a similar was as the Load Extension Method. If there are no changes in the Domain Context, we signal the Observer that this is the end of the event stream. As there is nothing to do when unsubscribing an empty Action is returned. The interesting bits happen on the ReturnSubmit method.

Return Submit

   1: private static void ReturnSubmit(this IObserver<SubmitOperation> ob, SubmitOperation op)
   2: {
   3:     if (op.HasError)
   4:     {
   5:         op.MarkErrorAsHandled();
   6:         ob.OnError(op.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(op);
  11:         ob.OnCompleted();
  12:     }
  13: }

If there was an error on the Submit Operation, it is marked as handled and the Exception is sent to the Subscriber through the OnError method. If the Domain Context Submit Operation worked, then the OnNext method is called giving the subscriber the Submit Operation object. The OnCompleted method is called to signal the Observer that this is the end of the event stream.


One of the reasons one would want to wrap Domain Context calls in Observables is you can use them with other Rx Extension Methods. For example; if you wanted the Domain Context to load several Entities but you wanted to wait for all of them to be done before you proceeded. The Rx Extension Method Fork Join works great for this.

   1: ctx = new WidgetDomainContext();
   2: EntityQuery<Widget> query = ctx.GetWidgetsQuery();
   4: var load1 = ctx.LoadWithRx<Widget>(query);
   5: var load2 = ctx.LoadWithRx<Widget>(query);      
   7: Observable.ForkJoin(load1, load2).Subscribe();

You can imagine the extra state you would have to keep around to accomplished the same functionality without Rx.

Article originally appeared on Tony Abell Blog (http://www.tonyabell.com/).
See website for complete article licensing information.