プログラミング/C#/Reactive Extentions のバックアップソース(No.1)

更新

[[公開メモ]]

* 概要 [#gdb4348f]

Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れる。

* テストプログラム [#xbe40156]

 LANG:c_sharp
 using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using System.Windows;
 using System.Windows.Controls;
 using System.Windows.Data;
 using System.Windows.Documents;
 using System.Windows.Input;
 using System.Windows.Media;
 using System.Windows.Media.Imaging;
 using System.Windows.Navigation;
 using System.Windows.Shapes;
 
 using System.Reactive.Linq;
 
 namespace WpfApplication1
 {
     /// <summary>
     /// コンストラクターに与えられた Action を Dispose 時に実行するクラス
     /// </summary>
     class ActionOnDispose : IDisposable
     {
         Action action;
         public ActionOnDispose(Action action)
         {
             this.action = action;
         }
 
         public void Dispose()
         {
             action();
         }
     }
 
     /// <summary>
     /// オブザーバーリストを管理する Observable クラス
     /// 
     /// OnAfterSubscribe / OnBeforeSubscribe が利用可能
     /// </summary>
     /// <typeparam name="T">監視されるデータ</typeparam>
     public class Observable<T> : IObservable<T>
     {
         /// <summary>
         /// オブザーバーのリスト
         /// </summary>
         private Dictionary<IObserver<T>, bool> observers
              = new Dictionary<IObserver<T>, bool>();
 
         /// <summary>
         /// オブザーバーを登録する
         /// </summary>
         /// <param name="observer">登録するオブザーバー</param>
         /// <returns>登録解除のための IDisposable</returns>
         public IDisposable Subscribe(IObserver<T> observer)
         {
             // observer を登録して
             lock (observers) {
                 observers[observer] = true;
             }
             OnAfterSubscription(observer);
 
             // Dispose 時に Unsubscribe する IDisposable を返す
             return new ActionOnDispose(() => Unsubscribe(observer));
         }
 
         /// <summary>
         /// オブザーバーの登録を解除する
         /// </summary>
         /// <param name="observer">登録を解除するオブザーバー</param>
         private void Unsubscribe(IObserver<T> observer)
         {
             OnBeforeUnsubscription(observer);
             lock (observers) {
                 observers.Remove(observer);
             }
         }
 
         /// <summary>
         /// オブザーバーが登録された直後に実行する処理
         /// </summary>
         /// <param name="observer">登録されたオブザーバー</param>
         protected virtual void OnAfterSubscription(IObserver<T> observer) { }
 
         /// <summary>
         /// オブザーバーが登録解除される直前に実行する処理
         /// </summary>
         /// <param name="observer">登録解除されるオブザーバー</param>
         protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { }
 
         /// <summary>
         /// observer に新しい値を送る
         /// </summary>
         /// <param name="t">observer に送る値</param>
         public void OnNext(T t)
         {
             foreach (var observer in observers.Keys.ToArray())
                 if (observers.ContainsKey(observer))
                     observer.OnNext(t);
         }
     }
 
     /// <summary>
     /// MainWindow.xaml の相互作用ロジック
     /// </summary>
     public partial class MainWindow : Window
     {
         /// <summary>
         /// 文字列値を通知する observable
         /// </summary>
         Observable<string> observable = new Observable<string>();
 
         public MainWindow()
         {
             InitializeComponent();
 
             // 同期コンテキスト
             var syncContext = System.Threading.SynchronizationContext.Current;
 
             // UI スレッドからの通知を別スレッドで待ち受ける
             Task.Run(() => {
 
                 // 4回受け取るまで処理する
                 observable.Take(4).Do(s => {
                     textBlock1.Text += s + "\n";
                 }).Wait();
 
                 // 以下は4回受け取った後に実行する内容
 
                 // UI を操作するため同期コンテキストで実行する
                 // http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/
                 syncContext.Post(s => {
                     // s には Post の第2パラメータ "Finished!" が渡される
                     textBlock1.Text += (s as string) + "\n";
                 }, "Finished!");
             });
         }
 
         private void Button_Click(object sender, RoutedEventArgs e)
         {
             // 文字列を通知する
             observable.OnNext("ok");
         }
     }
 }

Counter: 5308 (from 2010/06/03), today: 1, yesterday: 2