プログラミング/C#/Reactive Extentions の変更点
更新- 追加された行はこの色です。
- 削除された行はこの色です。
- プログラミング/C#/Reactive Extentions へ行く。
- プログラミング/C#/Reactive Extentions の差分を削除
[[公開メモ]] * 概要 [#gdb4348f] Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れるそうです。 * テストプログラム [#xbe40156] 便利そうなので、よく分からないながらテストプログラムを作って試してみました。 LANG:csharp(linenumber) using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows; using System.Windows.Controls; using System.Windows.Data; using System.Windows.Documents; using System.Windows.Input; using System.Windows.Media; using System.Windows.Media.Imaging; using System.Windows.Navigation; using System.Windows.Shapes; using System.Reactive.Linq; namespace WpfApplication1 { /// <summary> /// コンストラクターに与えられた Action を Dispose 時に実行するクラス /// </summary> class ActionOnDispose : IDisposable { Action action; public ActionOnDispose(Action action) { this.action = action; } public void Dispose() { action(); } } /// <summary> /// オブザーバーリストを管理する Observable クラス /// /// OnAfterSubscribe / OnBeforeSubscribe が利用可能 /// </summary> /// <typeparam name="T">監視されるデータ</typeparam> public class Observable<T> : IObservable<T> { /// <summary> /// オブザーバーのリスト /// </summary> private List<IObserver<T>> observers = new List<IObserver<T>>(); /// <summary> /// オブザーバーを登録する /// </summary> /// <param name="observer">登録するオブザーバー</param> /// <returns>登録解除のための IDisposable</returns> public IDisposable Subscribe(IObserver<T> observer) { // observer を登録して lock (observers) { observers.Add(observer); } OnAfterSubscription(observer); // Dispose 時に Unsubscribe する IDisposable を返す return new ActionOnDispose(() => Unsubscribe(observer)); } /// <summary> /// オブザーバーの登録を解除する /// </summary> /// <param name="observer">登録を解除するオブザーバー</param> private void Unsubscribe(IObserver<T> observer) { OnBeforeUnsubscription(observer); lock (observers) { observers.Remove(observer); } } /// <summary> /// オブザーバーが登録された直後に実行する処理 /// </summary> /// <param name="observer">登録されたオブザーバー</param> protected virtual void OnAfterSubscription(IObserver<T> observer) { } /// <summary> /// オブザーバーが登録解除される直前に実行する処理 /// </summary> /// <param name="observer">登録解除されるオブザーバー</param> protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { } /// <summary> /// observer に新しい値を送る /// </summary> /// <param name="t">observer に送る値</param> public void OnNext(T t) { foreach (var observer in observers.ToArray()) if (observers.Contains(observer)) observer.OnNext(t); } /// <summary> /// 処理の終了を通知する /// </summary> public void OnCompleted() { foreach (var observer in observers.ToArray()) if (observers.Contains(observer)) observer.OnCompleted(); } /// <summary> /// エラーを通知する /// </summary> public void OnError(Exception error) { foreach (var observer in observers.ToArray()) if (observers.Contains(observer)) observer.OnError(error); } } /// <summary> /// MainWindow.xaml の相互作用ロジック /// </summary> public partial class MainWindow : Window { /// <summary> /// 文字列値を通知する observable /// </summary> Observable<string> observable = new Observable<string>(); public MainWindow() { InitializeComponent(); // 同期コンテキスト var syncContext = System.Threading.SynchronizationContext.Current; // UI スレッドからの通知を別スレッドで待ち受ける Task.Run(() => { // 4回受け取るまで処理する observable.Take(4).Do(s => { textBlock1.Text += s + "\n"; }).Wait(); // 以下は4回受け取った後に実行する内容 // UI を操作するため同期コンテキストで実行する // http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/ syncContext.Post(s => { // s には Post の第2パラメータの "Finished!" が渡される textBlock1.Text += (s as string) + "\n"; }, "Finished!"); }); } private void Button_Click(object sender, RoutedEventArgs e) { // 文字列を通知する observable.OnNext("ok"); } } } ボタンを4回押すと、それぞれ textBlock1 に ok の文字が追加され、 それが終わるとさらに Finished! の文字が追加されます。 ミソとなるのは 144行目からの LANG:csharp // 4回受け取るまで処理する observable.Take(4).Do(s => { textBlock1.Text += s + "\n"; }).Wait(); の部分で、UI スレッドから LANG:csharp // 文字列を通知する observable.OnNext("ok"); のようにして IObservable<string> observable に送られる文字列を、 LINQ を使って順に処理した上で、処理完了を待ってから次の処理に移ることができます。 上の例では observable.OnNext が UI スレッドから呼ばれ、 Do に渡されたデリゲートも同じスレッドで実行されるため、 Do の中からは UI コントロールに自由にアクセスできますが、 Wait() 後の処理は別スレッドで実行されるため UI コントロールにアクセスするには 同期処理が必要になっています。 ** 別解1 [#zf3dd8e5] Wait() で同期的に終了を待つ代わりに GetAwaiter() により非同期的にクエリを開始すれば、 明示的に別スレッドを開始する必要はありません。内部で自動的にスレッドが起動されます。 この場合、クエリの終了後に行いたい処理があれば Finally() に書けますね。 LANG:csharp public MainWindow() { InitializeComponent(); // 4回受け取るまで処理する var awaiter = observable.Take(4).Do(s => { textBlock1.Text += s + "\n"; }).Finally(() => textBlock1.Text += "Finished!\n").GetAwaiter(); // 必要なところで awaiter.Wait(); として終了まで待機できる } ** 別解2 [#h0dd6b03] 明示的に Subscript を呼ぶこともできますが、その場合呼び出し側から終了を検知できないし、 Finally と Subscribe の順番も気にくわないしで別解1の方がエレガントに思えます。 Subscribe の戻り値を使って好きなときに unsubscribe できるというメリットも 無いわけではないですが・・・代わりに TakeWhile などで判別すれば良い場合が多いような。 ただ実は下で見るように Do を挟むと単純に Subscribe したのに比べてかなりパフォーマンスが落ちるため、 実行速度が問題になる場合には素直に Subscribe で書いた方が良いようです。 LANG:csharp public MainWindow() { InitializeComponent(); // 4回受け取るまで処理する var subscription = observable .Take(4) .Finally(() => textBlock1.Text += "Finished!\n") .Subscribe(s => { textBlock1.Text += s + "\n"; }); // 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる } ** 別解3 [#h0dd6b03] 考えてみると Subscribe するなら Finally ではなく OnCompleted を使うべきですね。 LANG:csharp public MainWindow() { InitializeComponent(); // 4回受け取るまで処理する var subscription = observable .Take(4) .Subscribe( s => textBlock1.Text += s + "\n", ()=> textBlock1.Text += "Finished!\n" ); // 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる } * パフォーマンス [#k2d1907e] ** 非同期クエリの開始・終了は重い [#y109d9b5] 非同期クエリの開始・終了処理はかなり重い処理のようです? 下記のような10万回の実行で 0.5 秒もかかりました。 1回あたり 5μ秒となります。 LANG:csharp var start = DateTime.Now.Ticks; var observable = new Observable<int>(); for (var i = 0; i < 100000; i++) { // 1回あたり 5μ秒 observable.Take(1).Do(n => { }).GetAwaiter(); observable.OnNext(0); } textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString(); ** 通知自体もそこそこ重い? [#l7e348d3] 1000万回の実行で1.5秒ほどでした。 1回あたり 150 ns にあたります。 LANG:csharp var start = DateTime.Now.Ticks; var sum = 0L; var observable = new Observable<int>(); var awaiter = observable.Do(i => sum += i }).GetAwaiter(); Task.Run(() => { for (var i = 0L; i < 1000000; i++) { observable.OnNext(1); } observable.OnCompleted(); }); awaiter.Wait(); textBlock1.Text += "\nAnswer=" + sum.ToString() + "\n" + (DateTime.Now.Ticks - start).ToString(); *** 普通に実行した場合との差 [#j59761d3] 下記のように普通に for を回すだけだと 50ms で終了しますので、 1回転あたり 5ns しかかからず、 上記の 150ns というのがほぼすべて Rx のディスパッチ処理にかかっていることが分かります。 LANG:csharp var start = DateTime.Now.Ticks; var sum = 0L; Task.Run(() => { for (var i = 0L; i < 10000000; i++) { (new Action(() => sum += 1)).Invoke(); // 5ns } }).Wait(); textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString(); ちなみにラムダ式の Invoke() で書いている部分を単に sum += 1 と書き直すと 3 ns で済みます。 ** どこが遅いのか [#qdaf0f1a] 処理の流れとしては、 LANG:csharp observable.OnNext(t); により、 LANG:csharp foreach (var observer in observers.ToArray()) if (observers.Contains(observer)) observer.OnNext(t); が実行され、observer.OnNext(t) によりラムダ式が実行されます。 あちこち削って試してみると、 - observers.ToArray() に 20ns - if (observers.Contains(observer)) に 30ns - foreach に 40ns - observers[i] の参照に 5ns かかります。 さらに、 LANG:csharp var awaiter = observable.Do(i => sum += i).GetAwaiter(); の代わりに LANG:csharp observable.Subscribe(i => sum += i); とすると、40ns くらい早くなるので、Do がそれだけ時間がかかっていることになります。 全部除くと 15ns となって、元の 5ns に近づきます。 残りの 10ns はラムダ式が Observer<T> にラップされていることによるのではないかと思いますが、調べ切れていません。 ** System.Reactive.Subjects.Subject<> を使う [#t3f25d73] 自前の Observable<T> を定義することをやめて、 Rx は遅いという記事はネット上でもたくさん見付かるようですね。 その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。 http://okazuki.hatenablog.com/entry/20111214/1323828027 確かに、自前の Observable<T> を定義することをやめて、 System.Reactive.Subjects.Subject<> を用いると上記のオーバーヘッドを劇的に減らせます。 LANG:csharp var observable = new Observable<int>(); としていたところを LANG:csharp var observable = new System.Reactive.Subjects.Subject<int>(); として、Do ではなく Subscribe を使ったところ 17ns 程度となって、 非常に高速に動作します。自分で実装したコードに比べてここまで早い理由はよく分かりませんが、 迷わず Subject<> を使うべきであることが分かります。 *** スケジューラーの指定 [#u944483c] さすがにかかりすぎなので、スケジューラの指定などでもっと軽い処理を行うことができないか 試してみました。 上記のように普通に実行して 3 秒で終わった処理は、 observable.Do を observable.ObserveOn(System.Reactive.Concurrency.DefaultScheduler.Instance).Do としたところ、6.2 秒かかるようになりました。 observable.Do を observable.ObserveOn(System.Reactive.Concurrency.ImmediateScheduler.Instance).Do としたところ、18 秒かかるようになりました。 observable.Do を observable.ObserveOn(System.Reactive.Concurrency.CurrentThreadScheduler.Instance).Do としたところ、40 秒かかるようになりました。 軽くなるような指定の仕方、まだ見つけられずにいます。 // ** Subject を使う [#bad6086f] // // Rx は遅いという記事はネット上でもたくさん見付かるようですね。 // // その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。 // // http://okazuki.hatenablog.com/entry/20111214/1323828027 // // 確かに上記を次のように直すだけで // // LANG:csharp // // var observable = new Observable<int>(); // var observable = new System.Reactive.Subjects.Subject<int>(); // * コメント・質問 [#m53a045c] #article_kcaptcha
Counter: 5975 (from 2010/06/03),
today: 1,
yesterday: 3