プログラミング/C#/Reactive Extentions のバックアップソース(No.3)

更新

[[公開メモ]]

* 概要 [#gdb4348f]

Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れるそうです。

* テストプログラム [#xbe40156]

便利そうなので、よく分からないながらテストプログラムを作って試してみました。

 LANG:csharp(linenumber)
 using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
 using System.Windows;
 using System.Windows.Controls;
 using System.Windows.Data;
 using System.Windows.Documents;
 using System.Windows.Input;
 using System.Windows.Media;
 using System.Windows.Media.Imaging;
 using System.Windows.Navigation;
 using System.Windows.Shapes;
 
 using System.Reactive.Linq;
 
 namespace WpfApplication1
 {
     /// <summary>
     /// コンストラクターに与えられた Action を Dispose 時に実行するクラス
     /// </summary>
     class ActionOnDispose : IDisposable
     {
         Action action;
         public ActionOnDispose(Action action)
         {
             this.action = action;
         }
 
         public void Dispose()
         {
             action();
         }
     }
 
     /// <summary>
     /// オブザーバーリストを管理する Observable クラス
     /// 
     /// OnAfterSubscribe / OnBeforeSubscribe が利用可能
     /// </summary>
     /// <typeparam name="T">通知されるデータ型</typeparam>
     public class Observable<T> : IObservable<T>
     {
         /// <summary>
         /// オブザーバーのリスト
         /// </summary>
         private Dictionary<IObserver<T>, bool> observers
              = new Dictionary<IObserver<T>, bool>();
 
         /// <summary>
         /// オブザーバーを登録する
         /// </summary>
         /// <param name="observer">登録するオブザーバー</param>
         /// <returns>登録解除のための IDisposable</returns>
         public IDisposable Subscribe(IObserver<T> observer)
         {
             // observer を登録して
             lock (observers) {
                 observers.Add(observer, true);
             }
             OnAfterSubscription(observer);
 
             // Dispose 時に Unsubscribe する IDisposable を返す
             return new ActionOnDispose(() => Unsubscribe(observer));
         }
 
         /// <summary>
         /// オブザーバーの登録を解除する
         /// </summary>
         /// <param name="observer">登録を解除するオブザーバー</param>
         private void Unsubscribe(IObserver<T> observer)
         {
             OnBeforeUnsubscription(observer);
             lock (observers) {
                 observers.Remove(observer);
             }
         }
 
         /// <summary>
         /// オブザーバーが登録された直後に実行する処理
         /// </summary>
         /// <param name="observer">登録されたオブザーバー</param>
         protected virtual void OnAfterSubscription(IObserver<T> observer) { }
 
         /// <summary>
         /// オブザーバーが登録解除される直前に実行する処理
         /// </summary>
         /// <param name="observer">登録解除されるオブザーバー</param>
         protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { }
 
         /// <summary>
         /// observer に新しい値を送る
         /// </summary>
         /// <param name="t">observer に送る値</param>
         public void OnNext(T t)
         {
             foreach (var observer in observers.Keys.ToArray())
                 if (observers.ContainsKey(observer))
                     observer.OnNext(t);
         }
 
        /// <summary>
        /// 処理の終了を通知する
        /// </summary>
        public void OnCompleted()
        {
            foreach (var observer in observers.Keys.ToArray())
                if (observers.ContainsKey(observer))
                    observer.OnCompleted();
        }
    }
 
     /// <summary>
     /// MainWindow.xaml の相互作用ロジック
     /// </summary>
     public partial class MainWindow : Window
     {
         /// <summary>
         /// 文字列値を通知する observable
         /// </summary>
         Observable<string> observable = new Observable<string>();
 
         public MainWindow()
         {
             InitializeComponent();
 
             // 同期コンテキスト
             var syncContext = System.Threading.SynchronizationContext.Current;
 
             // UI スレッドからの通知を別スレッドで待ち受ける
             Task.Run(() => {
 
                 // 4回受け取るまで処理する
                 observable.Take(4).Do(s => {
                     textBlock1.Text += s + "\n";
                 }).Wait();
 
                 // 以下は4回受け取った後に実行する内容
 
                 // UI を操作するため同期コンテキストで実行する
                 // http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/
                 syncContext.Post(s => {
                     // s には Post の第2パラメータ "Finished!" が渡される
                     textBlock1.Text += (s as string) + "\n";
                 }, "Finished!");
             });
         }
 
         private void Button_Click(object sender, RoutedEventArgs e)
         {
             // 文字列を通知する
             observable.OnNext("ok");
         }
     }
 }

ボタンを4回押すと、それぞれ textBlock1 に ok の文字が追加され、
それが終わるとさらに Finished! の文字が追加されます。

ミソとなるのは 124行目からの

 LANG:csharp
     // 4回受け取るまで処理する
     observable.Take(4).Do(s => {
         textBlock1.Text += s + "\n";
     }).Wait();

の部分で、別スレッドから

 LANG:csharp
     // 文字列を通知する
     observable.OnNext("ok");

のようにして IObservable<string> observable に送られる文字列を、
LINQ を使って順に処理した上で、処理完了を待ってから次の処理に移ることができます。

Task.Run で実行される別スレッドに記述しているにも関わらず、
Do に記述したラムダ式は UI 同期コンテキストで実行されており、
自由に UI にアクセスできるのが興味深いです。

便利は便利だけれど、実行速度などに心配も残ります???

** 別解1 [#zf3dd8e5]

Wait() で同期的に終了を待つ代わりに GetAwaiter() により非同期的にクエリを開始すれば、
明示的に別スレッドを開始する必要はありません。内部で勝手に起動されます。

この場合、クエリの終了後に行いたい処理があれば Finally() に書けるようでした。

 LANG:csharp
    public MainWindow()
    {
        InitializeComponent();
 
        // 4回受け取るまで処理する
        var awaiter = observable.Take(4).Do(s => {
            textBlock1.Text += s + "\n";
        }).Finally(() => textBlock1.Text += "Finished!\n").GetAwaiter();
 
        // 必要なところで awaiter.Wait(); として終了まで待機できる
    }

** 別解2 [#h0dd6b03]

明示的に Subscript を呼ぶこともできますが、これだと呼び出し側から終了を検知できないし、
Finally と Subscribe の順番も気にくわないしで別解1の方がエレガントに思えます。

 LANG:csharp
    public MainWindow()
    {
        InitializeComponent();
 
        // 4回受け取るまで処理する
        observable.Take(4).Finally(() => textBlock1.Text += "Finished!\n").Subscribe(s => {
            textBlock1.Text += s + "\n";
        });
    }

* パフォーマンス [#k2d1907e]

** 非同期クエリの開始・終了は重い [#y109d9b5]

非同期クエリの開始・終了処理はかなり重い処理のようです。

裏でスレッドの開始・終了が行われるので仕方がないのかもしれませんが、
下記のような10万回の実行で 0.5 秒もかかりました。

1回あたり 5μ秒となります。

 LANG:csharp
    var start = DateTime.Now.Ticks;
 
    var observable = new Observable<int>();
    for (var i = 0; i < 100000; i++) {
        // 1回あたり 5μ秒
        observable.Take(1).Do(n => { }).GetAwaiter();
        observable.OnNext(0);
    }
 
    textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();

*** Subscribe / Unsubscribe だけにしてみると [#aefb4ad6]

 LANG:csharp
    for (var i = 0; i < 100000; i++) {
        using (observable.Take(1).Subscribe(n => { })) { };
    }

だと、0.3 秒でした。これだけで 3μ秒。ん?そうすると OnNext だけで 2μ秒???

** 通知自体もそこそこ重い? [#l7e348d3]

1000万回の実行で3秒ほどでした。

1回あたり 0.3μ秒にあたります。

上で見た 2μ秒との違いはどこにある?

 LANG:csharp
    var start = DateTime.Now.Ticks;
 
    var sum = 0L;
    var observable = new Observable<int>();
    var awaiter = observable.Do(i => sum += i }).GetAwaiter();
 
    Task.Run(() => {
        for (var i = 0L; i < 1000000; i++) {
            observable.OnNext(1);
        }
        observable.OnCompleted();
    });
 
    awaiter.Wait();
 
    textBlock1.Text += "\nAnswer=" + sum.ToString() + "\n" + (DateTime.Now.Ticks - start).ToString();

*** 普通に実行した場合との差 [#j59761d3]

下記のように普通に for を回すだけだと 30ms で終了しますので、
1回転あたり 3ns しかかからず、
上記の 0.3マイクロ秒というのがほぼすべて Rx のディスパッチ処理にかかっていることが分かります。

 LANG:csharp
    var start = DateTime.Now.Ticks;
 
    var sum = 0L;
    Task.Run(() => {
        for (var i = 0L; i < 10000000; i++) {
            // 1回あたり 3ns
            sum += 1;
        }
    }).Wait();
 
    textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();

*** スケジューラーの指定 [#u944483c]

さすがにかかりすぎなので、スケジューラの指定などでもっと軽い処理を行うことができないか
試してみました。

上記のように普通に実行して 3 秒で終わった処理は、

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.DefaultScheduler.Instance).Do
としたところ、6.2 秒かかるようになりました。

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.ImmediateScheduler.Instance).Do
としたところ、18 秒かかるようになりました。

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.CurrentThreadScheduler.Instance).Do
としたところ、40 秒かかるようになりました。

軽くなるような指定の仕方、まだ見つけられずにいます。

// ** Subject を使う [#bad6086f]
// 
// Rx は遅いという記事はネット上でもたくさん見付かるようですね。
// 
// その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。
// 
// http://okazuki.hatenablog.com/entry/20111214/1323828027
// 
// 確かに上記を次のように直すだけで
// 
//  LANG:csharp
//  //    var observable = new Observable<int>();
//      var observable = new System.Reactive.Subjects.Subject<int>();
// 

* コメント・質問 [#m53a045c]

#article_kcaptcha


Counter: 4366 (from 2010/06/03), today: 1, yesterday: 0