プログラミング/C#/Reactive Extentions のバックアップ(No.1)

更新


公開メモ

概要

Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れる。

テストプログラム

LANG:c_sharp
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

using System.Reactive.Linq;

namespace WpfApplication1
{
    /// <summary>
    /// コンストラクターに与えられた Action を Dispose 時に実行するクラス
    /// </summary>
    class ActionOnDispose : IDisposable
    {
        Action action;
        public ActionOnDispose(Action action)
        {
            this.action = action;
        }

        public void Dispose()
        {
            action();
        }
    }

    /// <summary>
    /// オブザーバーリストを管理する Observable クラス
    /// 
    /// OnAfterSubscribe / OnBeforeSubscribe が利用可能
    /// </summary>
    /// <typeparam name="T">監視されるデータ</typeparam>
    public class Observable<T> : IObservable<T>
    {
        /// <summary>
        /// オブザーバーのリスト
        /// </summary>
        private Dictionary<IObserver<T>, bool> observers
             = new Dictionary<IObserver<T>, bool>();

        /// <summary>
        /// オブザーバーを登録する
        /// </summary>
        /// <param name="observer">登録するオブザーバー</param>
        /// <returns>登録解除のための IDisposable</returns>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            // observer を登録して
            lock (observers) {
                observers[observer] = true;
            }
            OnAfterSubscription(observer);

            // Dispose 時に Unsubscribe する IDisposable を返す
            return new ActionOnDispose(() => Unsubscribe(observer));
        }

        /// <summary>
        /// オブザーバーの登録を解除する
        /// </summary>
        /// <param name="observer">登録を解除するオブザーバー</param>
        private void Unsubscribe(IObserver<T> observer)
        {
            OnBeforeUnsubscription(observer);
            lock (observers) {
                observers.Remove(observer);
            }
        }

        /// <summary>
        /// オブザーバーが登録された直後に実行する処理
        /// </summary>
        /// <param name="observer">登録されたオブザーバー</param>
        protected virtual void OnAfterSubscription(IObserver<T> observer) { }

        /// <summary>
        /// オブザーバーが登録解除される直前に実行する処理
        /// </summary>
        /// <param name="observer">登録解除されるオブザーバー</param>
        protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { }

        /// <summary>
        /// observer に新しい値を送る
        /// </summary>
        /// <param name="t">observer に送る値</param>
        public void OnNext(T t)
        {
            foreach (var observer in observers.Keys.ToArray())
                if (observers.ContainsKey(observer))
                    observer.OnNext(t);
        }
    }

    /// <summary>
    /// MainWindow.xaml の相互作用ロジック
    /// </summary>
    public partial class MainWindow : Window
    {
        /// <summary>
        /// 文字列値を通知する observable
        /// </summary>
        Observable<string> observable = new Observable<string>();

        public MainWindow()
        {
            InitializeComponent();

            // 同期コンテキスト
            var syncContext = System.Threading.SynchronizationContext.Current;

            // UI スレッドからの通知を別スレッドで待ち受ける
            Task.Run(() => {

                // 4回受け取るまで処理する
                observable.Take(4).Do(s => {
                    textBlock1.Text += s + "\n";
                }).Wait();

                // 以下は4回受け取った後に実行する内容

                // UI を操作するため同期コンテキストで実行する
                // http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/
                syncContext.Post(s => {
                    // s には Post の第2パラメータ "Finished!" が渡される
                    textBlock1.Text += (s as string) + "\n";
                }, "Finished!");
            });
        }

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            // 文字列を通知する
            observable.OnNext("ok");
        }
    }
}

Counter: 5353 (from 2010/06/03), today: 4, yesterday: 3