Entries in RIA (2)

Thursday
Jul142011

Linq Query Expression Syntax with RIA Domain Context

After discussing my previous post with Philip Freeman, he suggested another good example would be to show how to use Linq Query Expression Syntax with RIA Domain Context calls. To help facilitate this example I created two new Extension Methods; LoadEntityWithRx and LoadEntitiesWithRx .

Load Entity With Rx

   1: public static IObservable<T> LoadEntityWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<T>(ob =>
   4:     {
   5:         context.Load(query, ob.ReturnEntity, null);
   6:         return () =>
   7:         {
   8:           // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

Here we are only interested in a single Entity returned by the Load operation of the domain context. This Extension Method follows the same pattern as the ones on the previous blog post. The Load operation is called (line 5) on the Domain Context, and the work is done by the ReturnEntity method. Because nothing needs to be done on unsubscription, an empty Action is returned.

Return Entity

   1: private static void ReturnEntity<T>(this IObserver<T> ob, LoadOperation<T> loadOp) where T : Entity
   2:      {
   3:          if (loadOp.HasError)
   4:          {
   5:              loadOp.MarkErrorAsHandled();
   6:              ob.OnError(loadOp.Error);
   7:          }
   8:          else
   9:          {
  10:              if (loadOp.Entities.Count() != 1)
  11:              {
  12:                  ob.OnError(new Exception("More then one entity was returned"));
  13:              }
  14:              else
  15:              {
  16:                  ob.OnNext(loadOp.Entities.SingleOrDefault());
  17:              }
  18:              ob.OnCompleted();
  19:          }
  20:      }

This is where the work is done; we either pass the exception along to the Observer or pass the Entity along. More work should be done to handle missing Entities (line 10); here I assume there will be exactly one Entity returned, which is not always the case.

Another good side effect of using the Extension Methods is you can consolidate your Error Handling for Domain Service calls.

Load Entities With Rx

   1: public static IObservable<IEnumerable<T>> LoadEntitiesWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<IEnumerable<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.ReturnEntities , null);      
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

The same pattern is followed with the LoadEntityWithRx Extension Method. The work is being done with the ReturnEntities method.

Return Entities

   1: private static void ReturnEntities<T>(this IObserver<IEnumerable<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {                
  10:         ob.OnNext(loadOp.Entities);
  11:         ob.OnCompleted();
  12:     }
  13: }

A similar pattern is followed here as well, where Exceptions are passed to the Observer, and the OnNext method is called to pass along the Entities.

Query Expression Syntax Example

 

In this example I have created are really bad model for Customer and Orders.

   1: public class Customer
   2: {
   3:     [Key]
   4:     public Guid Id { get; set; }
   5:  
   6:     public string Name { get; set; }
   7: }
   8:  
   9: public class Order
  10: {
  11:     [Key]
  12:     public Guid Id { get; set; }
  13:  
  14:     public Guid CustomerId { get; set; }
  15:  
  16:     public string Desc { get; set; }
  17: }

Good Example:

If you want to Load an Entity i.e. a Customer and then you want to take data from that Entity i.e. Customer Id, and Load another Entity the nested callbacks can get really ugly (example below). I will show how it works with Linq Query Expression Syntax, and then I will show how it would be done normally.

   1: private void LoadCustomer()
   2: {
   3:     CustomerDomainContext context = new CustomerDomainContext();
   4:  
   5:     EntityQuery<Customer> customerQuery = context.GetCustomersQuery().Where(c=>c.Name == "Fred");
   6:  
   7:     var customerOrders = from customer in context.LoadEntityWithRx(customerQuery)
   8:                          let orderQuery = context.GetOrdersQuery().Where(o => o.CustomerId == customer.Id)
   9:                          from orders in context.LoadEntitiesWithRx(orderQuery)
  10:                          select new { Customer = customer, Orders = orders };
  11:  
  12:     customerOrders.Subscribe((custOrders) => 
  13:     { 
  14:         // do cool stuff
  15:     });                                 
  16: }

In the example above you can reason about what the code is doing. First query on user name of “Fred” (Lind 5) then get the Customers Id (Line 8), then Query on the Orders matching that Customer Id (Line 9). Finally it is returning to the subscriber as an Anonymous Type containing the Customer object and its Orders (Line 10).

Bad Example:

   1: private void LoadCustomersBad()
   2: {
   3:     CustomerDomainContext context = new CustomerDomainContext();
   4:     EntityQuery<Customer> customerQuery = context.GetCustomersQuery().Where(c => c.Name == "Fred");
   5:  
   6:     context.Load(customerQuery, (loadOp) => 
   7:     {
   8:         if (!loadOp.HasError)
   9:         {
  10:             if (loadOp.Entities.Count() <= 1)
  11:             {
  12:                 Customer customer = loadOp.Entities.Single();
  13:                 EntityQuery<Order> orderQuery = context.GetOrdersQuery().Where(o => o.CustomerId == customer.Id);
  14:                 context.Load(orderQuery, (loadOpOrder) => 
  15:                 {
  16:                     if (!loadOpOrder.HasError)
  17:                     {
  18:                         IEnumerable<Order> orders = loadOpOrder.Entities;                                
  19:                         // do cool stuff
  20:                     }
  21:                 }, null);
  22:             }
  23:         }
  24:     }, null);
  25: }

Here the same result is accomplished, but it is so much harder to reason about the code. Imagine if you had to maintain this code. I have seen a lot of production code where the developer used 5+ nested callbacks; not fun. Maintaining the first code sample will be so much easier.

The same code is being called, but with some well-placed Extension Methods we can use the power and ease of Linq Query Expression Syntax to make code more maintainable.

Wednesday
Jul132011

RIA Domain Context and Reactive Extensions

One of the nice things about Rx is it’s Composability. It is really convenient to be able to compose different types of events. The Observable class has FromAsyncPattern and FromEvent methods which make it easy to create Observables from those patterns. However Rx does not provide any method to create Observables for RIA Domain Context calls. Phillip Freeman showed me some Extension Methods he created to make Observables from the Load and Submit operations on a Domain Context. I will show how they work and can be used.

 

Load With Rx

   1: public static IObservable<LoadOperation<T>> LoadWithRx<T>(this DomainContext context, EntityQuery<T> query) where T : Entity
   2: {
   3:     return Observable.Create<LoadOperation<T>>(ob =>
   4:     {
   5:         context.Load(query, ob.Return, null);
   6:         return () =>
   7:         {
   8:             // On unsubscription, do nothing
   9:         };
  10:     });
  11: }

This part is fairly self-explanatory, we call the Load method on the Domain Context and pass in the Entity Query. The interesting stuff happened in the Return method. Because this is not creating a stream of events, once the Load Operation is handled there is nothing to do on unsubscription so an empty action is returned.

Return

   1: private static void Return<T>(this IObserver<LoadOperation<T>> ob, LoadOperation<T> loadOp) where T : Entity
   2: {
   3:     if (loadOp.HasError)
   4:     {
   5:         loadOp.MarkErrorAsHandled();
   6:         ob.OnError(loadOp.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(loadOp);
  11:         ob.OnCompleted();
  12:     }
  13: }

 

This is where we handle the Load Operation. If there was an error in the Load Operation, we marked it as handled.  We return the Exception to the Observer so the subscriber can handle it.

If there was no error, we return the Load Operation object with the OnNext call, then signal the end of the event stream by calling OnCompleted.

Submit With Rx

   1: public static IObservable<SubmitOperation> SubmitWithRx(this DomainContext context)
   2: {
   3:     return Observable.Create<SubmitOperation>(ob =>
   4:     {
   5:         if (context.HasChanges)
   6:         {
   7:             context.SubmitChanges(ob.ReturnSubmit, null);
   8:         }
   9:         else
  10:         {
  11:             ob.OnCompleted();
  12:         }
  13:  
  14:         return () =>
  15:         {
  16:             // On unsubscription, do nothing
  17:         };
  18:     });
  19: }

This Extension Method on the Domain Context works in a similar was as the Load Extension Method. If there are no changes in the Domain Context, we signal the Observer that this is the end of the event stream. As there is nothing to do when unsubscribing an empty Action is returned. The interesting bits happen on the ReturnSubmit method.

Return Submit

   1: private static void ReturnSubmit(this IObserver<SubmitOperation> ob, SubmitOperation op)
   2: {
   3:     if (op.HasError)
   4:     {
   5:         op.MarkErrorAsHandled();
   6:         ob.OnError(op.Error);
   7:     }
   8:     else
   9:     {
  10:         ob.OnNext(op);
  11:         ob.OnCompleted();
  12:     }
  13: }

If there was an error on the Submit Operation, it is marked as handled and the Exception is sent to the Subscriber through the OnError method. If the Domain Context Submit Operation worked, then the OnNext method is called giving the subscriber the Submit Operation object. The OnCompleted method is called to signal the Observer that this is the end of the event stream.

Example

One of the reasons one would want to wrap Domain Context calls in Observables is you can use them with other Rx Extension Methods. For example; if you wanted the Domain Context to load several Entities but you wanted to wait for all of them to be done before you proceeded. The Rx Extension Method Fork Join works great for this.

   1: ctx = new WidgetDomainContext();
   2: EntityQuery<Widget> query = ctx.GetWidgetsQuery();
   3:             
   4: var load1 = ctx.LoadWithRx<Widget>(query);
   5: var load2 = ctx.LoadWithRx<Widget>(query);      
   6:       
   7: Observable.ForkJoin(load1, load2).Subscribe();

You can imagine the extra state you would have to keep around to accomplished the same functionality without Rx.