Utilizando Reactive Extensions – Play .NET

En esta entrada seguiré con el tema desarrollado en la anterior entrada, Reactive Extensions, utilizando un proyecto de ejemplo para poner práctica los conceptos descritos en ésta. Para dicho ejemplo planteo un escenario en el que tenemos un único origen de datos que envía diferentes tipos de información que se mostrarán por pantalla y en el que también se lanzarán procesos de larga de duración.

Suscripción a origen de datos

El primer paso es definir un objeto de tipo ISubject, que representa un flujo, al que los diferentes módulos se suscribirán y que será en el encargado de notificar a éstos cuando se reciba información desde el origen de datos. Para este ejemplo, el origen de datos será un DispatcherTimer que lanzará datos aleatorios cada segundo.

private readonly ISubject _source = new Subject();

En el procedimiento de suscripción es donde utilizaremos varios de los aspectos vistos en las entrada anterior ya que la idea es que un módulo muestre solo cierto tipo de información, además de que los datos  sufran una transformación previa a su notificación a lo suscriptores.

Para este sencillo ejemplo se utiliza un par clave-valor, en el que la clave representa el tipo de dato que contiene el valor de ese par. Para suscribirnos a un tipo determinado de información incluiremos un método Subscribe como este:

public IDisposable Subscribe(int code, Action action)
{
     return _source.Where(val => val.Key == code).Select(Convert).Subscribe(action);
}

El método tendrá dos parámetros:

  • Code, que indica el tipo de datos al que nos queremos suscribir. En este caso es un int, pero podría ser un string, un enumerado etc.
  • Action<string>, el delegado que se ejecutará al recibir una notificación del tipo al que nos hemos suscrito.

Para llevar a cabo la suscripción a un tipo de datos concreto hemos de hacer uso de la clausula Where, en la que deberemos especificar las condiciones que se deben cumplir para que se lance el delegado que hemos seleccionado. En el caso del ejemplo, recibimos como parámetro de la función de suscripción el código del tipo de datos, que deberá coincidir con la clave del par que el origen de datos nos envía.

Incluyendo solo la clausula Where, nos encontramos con un error ya que el origen de datos envía pares clave-valor y el delegado espera solo un string. El Where solo filtra, para poder manejar los datos disponemos de la clausula Select que nos permite realizar transformaciones en la información recibida. Así, el método de grupo Convert tiene como parámetro de entrada un par clave-valor y devuelve un string.

Otro detalle que destaqué en la entrada anterior era cómo se gestionaban las anulaciones de las suscripciones. La función de suscripción que he detallado anteriormente devuelve un objeto de tipo IDisposable cuya referencia deberá ser guardada para que, en el momento oportuno, se llame al método Dispose y eliminar la suscripción.

Para poner en marcha este sistema simplemente hacemos que en el evento Tick del DispatcherTimer se llame al método OnNext, pasando valores aleatorios como parámetros.

Notificación de progreso

Otro aspectos muy interesante relacionado con Rx, es la posibilidad de instanciar objectos de tipo Subject en tiempo de ejecución, lo que nos aporta una gran flexibilidad y limpieza en el código.  De esta manera podemos pasar esta instancia como un objeto que implementa IObserver y lanzar notificaciones durante la ejecución de una tarea de larga duración.

            ISubject> subject = new Subject>();

            subject.Subscribe(pair =>
                                  {
                                      Percentage = pair.Key;
                                      Message = pair.Value;
                                  });

            bool result = _service.LoadData(subject);

En el trozo de código anterior se puede ver cómo se crea una instancia de Subject, se asigna un método anónimo que se encargará de recoger los datos recibidos y mostrarlos por pantalla, y una llamada a la función LoadData que tiene como único parámetro un IObserver. Ya dentro de esta función, se pueden hacer sucesivas llamadas al método OnNext para indicar el proceso y la tarea que está ejecutando.

Dejo disponible el código de un proyecto sencillo que he utilizado para el desarrollo de esta entrada. Eleminar extensión .doc y descomprimir el rar: RxSample.rar.doc.

De paso comentar que mañana, 4 de octubre, se celebra un webcast que impartirá Fernando Escolar sobre Rx. Os podéis apuntar aquí. Además también podemos encontrar unos cuantos tutoriales sobre Rx en su página.

Anuncios
Esta entrada fue publicada en .NET y etiquetada , , . Guarda el enlace permanente.

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s