プログラミング/C#/Reactive Extentions

(1177d) 更新

公開メモ

概要

Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れるそうです。

テストプログラム

便利そうなので、よく分からないながらテストプログラムを作って試してみました。

LANG:csharp(linenumber)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;

using System.Reactive.Linq;

namespace WpfApplication1
{
    /// <summary>
    /// コンストラクターに与えられた Action を Dispose 時に実行するクラス
    /// </summary>
    class ActionOnDispose : IDisposable
    {
        Action action;
        public ActionOnDispose(Action action)
        {
            this.action = action;
        }

        public void Dispose()
        {
            action();
        }
    }

    /// <summary>
    /// オブザーバーリストを管理する Observable クラス
    /// 
    /// OnAfterSubscribe / OnBeforeSubscribe が利用可能
    /// </summary>
    /// <typeparam name="T">監視されるデータ</typeparam>
    public class Observable<T> : IObservable<T>
    {
        /// <summary>
        /// オブザーバーのリスト
        /// </summary>
        private List<IObserver<T>> observers = new List<IObserver<T>>();

        /// <summary>
        /// オブザーバーを登録する
        /// </summary>
        /// <param name="observer">登録するオブザーバー</param>
        /// <returns>登録解除のための IDisposable</returns>
        public IDisposable Subscribe(IObserver<T> observer)
        {
            // observer を登録して
            lock (observers) {
                observers.Add(observer);
            }
            OnAfterSubscription(observer);

            // Dispose 時に Unsubscribe する IDisposable を返す
            return new ActionOnDispose(() => Unsubscribe(observer));
        }

        /// <summary>
        /// オブザーバーの登録を解除する
        /// </summary>
        /// <param name="observer">登録を解除するオブザーバー</param>
        private void Unsubscribe(IObserver<T> observer)
        {
            OnBeforeUnsubscription(observer);
            lock (observers) {
                observers.Remove(observer);
            }
        }

        /// <summary>
        /// オブザーバーが登録された直後に実行する処理
        /// </summary>
        /// <param name="observer">登録されたオブザーバー</param>
        protected virtual void OnAfterSubscription(IObserver<T> observer) { }

        /// <summary>
        /// オブザーバーが登録解除される直前に実行する処理
        /// </summary>
        /// <param name="observer">登録解除されるオブザーバー</param>
        protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { }

        /// <summary>
        /// observer に新しい値を送る
        /// </summary>
        /// <param name="t">observer に送る値</param>
        public void OnNext(T t)
        {
            foreach (var observer in observers.ToArray())
                if (observers.Contains(observer))
                    observer.OnNext(t);
        }

        /// <summary>
        /// 処理の終了を通知する
        /// </summary>
        public void OnCompleted()
        {
            foreach (var observer in observers.ToArray())
                if (observers.Contains(observer))
                    observer.OnCompleted();
        }

        /// <summary>
        /// エラーを通知する
        /// </summary>
        public void OnError(Exception error)
        {
            foreach (var observer in observers.ToArray())
                if (observers.Contains(observer))
                    observer.OnError(error);
        }
    }

    /// <summary>
    /// MainWindow.xaml の相互作用ロジック
    /// </summary>
    public partial class MainWindow : Window
    {
        /// <summary>
        /// 文字列値を通知する observable
        /// </summary>
        Observable<string> observable = new Observable<string>();

        public MainWindow()
        {
            InitializeComponent();

            // 同期コンテキスト
            var syncContext = System.Threading.SynchronizationContext.Current;

            // UI スレッドからの通知を別スレッドで待ち受ける
            Task.Run(() => {

                // 4回受け取るまで処理する
                observable.Take(4).Do(s => {
                    textBlock1.Text += s + "\n";
                }).Wait();

                // 以下は4回受け取った後に実行する内容

                // UI を操作するため同期コンテキストで実行する
                // http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/
                syncContext.Post(s => {
                    // s には Post の第2パラメータの "Finished!" が渡される
                    textBlock1.Text += (s as string) + "\n";
                }, "Finished!");
            });
        }

        private void Button_Click(object sender, RoutedEventArgs e)
        {
            // 文字列を通知する
            observable.OnNext("ok");
        }
    }
}

ボタンを4回押すと、それぞれ textBlock1 に ok の文字が追加され、 それが終わるとさらに Finished! の文字が追加されます。

ミソとなるのは 144行目からの

LANG:csharp
    // 4回受け取るまで処理する
    observable.Take(4).Do(s => {
        textBlock1.Text += s + "\n";
    }).Wait();

の部分で、UI スレッドから

LANG:csharp
    // 文字列を通知する
    observable.OnNext("ok");

のようにして IObservable<string> observable に送られる文字列を、 LINQ を使って順に処理した上で、処理完了を待ってから次の処理に移ることができます。

上の例では observable.OnNext が UI スレッドから呼ばれ、 Do に渡されたデリゲートも同じスレッドで実行されるため、 Do の中からは UI コントロールに自由にアクセスできますが、 Wait() 後の処理は別スレッドで実行されるため UI コントロールにアクセスするには 同期処理が必要になっています。

別解1

Wait() で同期的に終了を待つ代わりに GetAwaiter() により非同期的にクエリを開始すれば、 明示的に別スレッドを開始する必要はありません。内部で自動的にスレッドが起動されます。

この場合、クエリの終了後に行いたい処理があれば Finally() に書けますね。

LANG:csharp
   public MainWindow()
   {
       InitializeComponent();

       // 4回受け取るまで処理する
       var awaiter = observable.Take(4).Do(s => {
           textBlock1.Text += s + "\n";
       }).Finally(() => textBlock1.Text += "Finished!\n").GetAwaiter();

       // 必要なところで awaiter.Wait(); として終了まで待機できる
   }

別解2

明示的に Subscript を呼ぶこともできますが、その場合呼び出し側から終了を検知できないし、 Finally と Subscribe の順番も気にくわないしで別解1の方がエレガントに思えます。

Subscribe の戻り値を使って好きなときに unsubscribe できるというメリットも 無いわけではないですが・・・代わりに TakeWhile などで判別すれば良い場合が多いような。

ただ実は下で見るように Do を挟むと単純に Subscribe したのに比べてかなりパフォーマンスが落ちるため、 実行速度が問題になる場合には素直に Subscribe で書いた方が良いようです。

LANG:csharp
   public MainWindow()
   {
       InitializeComponent();

       // 4回受け取るまで処理する
       var subscription = observable
               .Take(4)
               .Finally(() => textBlock1.Text += "Finished!\n")
               .Subscribe(s => {
                   textBlock1.Text += s + "\n";
               });

       // 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる
   }

別解3

考えてみると Subscribe するなら Finally ではなく OnCompleted を使うべきですね。

LANG:csharp
   public MainWindow()
   {
       InitializeComponent();

       // 4回受け取るまで処理する
       var subscription = observable
               .Take(4)
               .Subscribe(
                 s => textBlock1.Text += s + "\n", 
                 ()=> textBlock1.Text += "Finished!\n"
               );

       // 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる
   }

パフォーマンス

非同期クエリの開始・終了は重い

非同期クエリの開始・終了処理はかなり重い処理のようです?

下記のような10万回の実行で 0.5 秒もかかりました。

1回あたり 5μ秒となります。

LANG:csharp
   var start = DateTime.Now.Ticks;

   var observable = new Observable<int>();
   for (var i = 0; i < 100000; i++) {
       // 1回あたり 5μ秒
       observable.Take(1).Do(n => { }).GetAwaiter();
       observable.OnNext(0);
   }

   textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();

通知自体もそこそこ重い?

1000万回の実行で1.5秒ほどでした。

1回あたり 150 ns にあたります。

LANG:csharp
   var start = DateTime.Now.Ticks;

   var sum = 0L;
   var observable = new Observable<int>();
   var awaiter = observable.Do(i => sum += i }).GetAwaiter();

   Task.Run(() => {
       for (var i = 0L; i < 1000000; i++) {
           observable.OnNext(1);
       }
       observable.OnCompleted();
   });

   awaiter.Wait();

   textBlock1.Text += "\nAnswer=" + sum.ToString() + "\n" + (DateTime.Now.Ticks - start).ToString();

普通に実行した場合との差

下記のように普通に for を回すだけだと 50ms で終了しますので、 1回転あたり 5ns しかかからず、 上記の 150ns というのがほぼすべて Rx のディスパッチ処理にかかっていることが分かります。

LANG:csharp
   var start = DateTime.Now.Ticks;

   var sum = 0L;
   Task.Run(() => {
       for (var i = 0L; i < 10000000; i++) {
           (new Action(() => sum += 1)).Invoke();  // 5ns
       }
   }).Wait();

   textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();

ちなみにラムダ式の Invoke() で書いている部分を単に sum += 1 と書き直すと 3 ns で済みます。

どこが遅いのか

処理の流れとしては、

LANG:csharp
observable.OnNext(t);

により、

LANG:csharp
foreach (var observer in observers.ToArray())
    if (observers.Contains(observer))
        observer.OnNext(t);

が実行され、observer.OnNext(t) によりラムダ式が実行されます。

あちこち削って試してみると、

  • observers.ToArray() に 20ns
  • if (observers.Contains(observer)) に 30ns
  • foreach に 40ns
  • observers[i] の参照に 5ns

かかります。

さらに、

LANG:csharp
var awaiter = observable.Do(i => sum += i).GetAwaiter();

の代わりに

LANG:csharp
observable.Subscribe(i => sum += i);

とすると、40ns くらい早くなるので、Do がそれだけ時間がかかっていることになります。

全部除くと 15ns となって、元の 5ns に近づきます。

残りの 10ns はラムダ式が Observer<T> にラップされていることによるのではないかと思いますが、調べ切れていません。

System.Reactive.Subjects.Subject<> を使う

Rx は遅いという記事はネット上でもたくさん見付かるようですね。

その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。

http://okazuki.hatenablog.com/entry/20111214/1323828027

確かに、自前の Observable<T> を定義することをやめて、 System.Reactive.Subjects.Subject<> を用いると上記のオーバーヘッドを劇的に減らせます。

LANG:csharp
var observable = new Observable<int>();

としていたところを

LANG:csharp
var observable = new System.Reactive.Subjects.Subject<int>();

として、Do ではなく Subscribe を使ったところ 17ns 程度となって、 非常に高速に動作します。自分で実装したコードに比べてここまで早い理由はよく分かりませんが、 迷わず Subject<> を使うべきであることが分かります。

スケジューラーの指定

さすがにかかりすぎなので、スケジューラの指定などでもっと軽い処理を行うことができないか 試してみました。

上記のように普通に実行して 3 秒で終わった処理は、

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.DefaultScheduler.Instance).Do としたところ、6.2 秒かかるようになりました。

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.ImmediateScheduler.Instance).Do としたところ、18 秒かかるようになりました。

observable.Do を observable.ObserveOn(System.Reactive.Concurrency.CurrentThreadScheduler.Instance).Do としたところ、40 秒かかるようになりました。

軽くなるような指定の仕方、まだ見つけられずにいます。

コメント・質問





Counter: 1821 (from 2010/06/03), today: 1, yesterday: 0