« Linq Query Expression Syntax with RIA Domain Context | Main | Taking Cues from Azure »
Wednesday
Jul132011

RIA Domain Context and Reactive Extensions

One of the nice things about Rx is it’s Composability. It is really convenient to be able to compose different types of events. The Observable class has FromAsyncPattern and FromEvent methods which make it easy to create Observables from those patterns. However Rx does not provide any method to create Observables for RIA Domain Context calls. Phillip Freeman showed me some Extension Methods he created to make Observables from the Load and Submit operations on a Domain Context. I will show how they work and can be used.

 

Load With Rx

   1: public static IObservable<LoadOperation<T>> LoadWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<LoadOperation<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.Return, null);
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

This part is fairly self-explanatory, we call the Load method on the Domain Context and pass in the Entity Query. The interesting stuff happened in the Return method. Because this is not creating a stream of events, once the Load Operation is handled there is nothing to do on unsubscription so an empty action is returned.

Return

   1: private static void Return<T>(this IObserver<LoadOperation<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(loadOp);
  11:         ob.OnCompleted();
  12:     }
  13: }

 

This is where we handle the Load Operation. If there was an error in the Load Operation, we marked it as handled.  We return the Exception to the Observer so the subscriber can handle it.

If there was no error, we return the Load Operation object with the OnNext call, then signal the end of the event stream by calling OnCompleted.

Submit With Rx

   1: public static IObservable<SubmitOperation> SubmitWithRx(this DomainContext context)
   2: {
   3:     return Observable.Create<SubmitOperation>(ob =>
   4:     {
   5:         if (context.HasChanges)
   6:         {
   7:             context.SubmitChanges(ob.ReturnSubmit, null);
   8:         }
   9:         else
  10:         {
  11:             ob.OnCompleted();
  12:         }
  13:  
  14:         return () =>
  15:         {
  16:             // On unsubscription, do nothing
  17:         };
  18:     });
  19: }

This Extension Method on the Domain Context works in a similar was as the Load Extension Method. If there are no changes in the Domain Context, we signal the Observer that this is the end of the event stream. As there is nothing to do when unsubscribing an empty Action is returned. The interesting bits happen on the ReturnSubmit method.

Return Submit

   1: private static void ReturnSubmit(this IObserver<SubmitOperation> ob, SubmitOperation op)
   2: {
   3:     if (op.HasError)
   4:     {
   5:         op.MarkErrorAsHandled();
   6:         ob.OnError(op.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(op);
  11:         ob.OnCompleted();
  12:     }
  13: }

If there was an error on the Submit Operation, it is marked as handled and the Exception is sent to the Subscriber through the OnError method. If the Domain Context Submit Operation worked, then the OnNext method is called giving the subscriber the Submit Operation object. The OnCompleted method is called to signal the Observer that this is the end of the event stream.

Example

One of the reasons one would want to wrap Domain Context calls in Observables is you can use them with other Rx Extension Methods. For example; if you wanted the Domain Context to load several Entities but you wanted to wait for all of them to be done before you proceeded. The Rx Extension Method Fork Join works great for this.

   1: ctx = new WidgetDomainContext();
   2: EntityQuery<Widget> query = ctx.GetWidgetsQuery();
   3:             
   4: var load1 = ctx.LoadWithRx<Widget>(query);
   5: var load2 = ctx.LoadWithRx<Widget>(query);      
   6:       
   7: Observable.ForkJoin(load1, load2).Subscribe();

You can imagine the extra state you would have to keep around to accomplished the same functionality without Rx.

PrintView Printer Friendly Version

EmailEmail Article to Friend

References (17)

References allow you to track sources for this article, as well as articles that were written in response to this article.
  • Response
    Response: indexing service
    Excellent Web-site, Keep up the great job. thnx!
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    All programming languages some primitive building blocks for that description associated with information along with the processes as well as transformations put on all of them.
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Response: XOVILICHTER
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Response: RH10 piano lessons
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Response: farmer game
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Response: Boton in a Box
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Response: qwe2
  • Response
    Response: ebook barato
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Tony Abell Blog - Blog - RIA Domain Context and Reactive Extensions
  • Response
    Января
  • Response
    Response: essay service
    The whole concept was new to me and really wanna make a try with the code and would like to know what was actually rx. Hope i can make it successful from my side. And it's been everlasting to be here and know everything.
  • Response
    Response: Showbox App

Reader Comments (2)

This post appears to explain exactly what I am trying to do. However, with the latest Reactive Extensions from NuGet and the latest RIA Services implementation, I can never get the Return method to fire. The only thing that returns is the empty method on "unsubscribe" in the LoadWithRx extension method. I'd like to do something like this in a synchronous fashion using RX.


Widget firstWidget = null;

var query = Context.GetWidgetsByType(1);

var load = Context.LoadWithRx<Widget>(query);

load.Subscribe( r =>
{
if ( r.HasError)
{
//log error
}

firstWidget = r.Entities.First();

});

In this scenario, firstWidget is always null and the Lamba inside the load.Subscribe never gets called. Maybe I don't understand RX very well?

February 16, 2012 | Unregistered CommenterDave Baumann

Dave,
Notice that by the definition of the Return method, the OnNext method will never be called in the case of an error, so your HasError check is redundant. Error checking should be performed by providing a lambda to asynchronously handle exceptions:

load.Subscribe((LoadOperation<Widget> loadOp) => { /* Success */ },
(Exception ex) => { /* Failure */ });

Perhaps there is an exception that you are not seeing? Maybe try adding a breakpoint in the Return method.

May 9, 2012 | Unregistered CommenterPhil Freeman

PostPost a New Comment

Enter your information below to add a new comment.

My response is on my own website »
Author Email (optional):
Author URL (optional):
Post:
 
Some HTML allowed: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <code> <em> <i> <strike> <strong>