プログラミング/C#/Reactive Extentions の変更点
更新- 追加された行はこの色です。
- 削除された行はこの色です。
- プログラミング/C#/Reactive Extentions へ行く。
- プログラミング/C#/Reactive Extentions の差分を削除
[[公開メモ]]
* 概要 [#gdb4348f]
Reactive Extentions ライブラリを用いると、push 式の非同期通知を LINQ を用いて受け取れるそうです。
* テストプログラム [#xbe40156]
便利そうなので、よく分からないながらテストプログラムを作って試してみました。
LANG:csharp(linenumber)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;
using System.Reactive.Linq;
namespace WpfApplication1
{
/// <summary>
/// コンストラクターに与えられた Action を Dispose 時に実行するクラス
/// </summary>
class ActionOnDispose : IDisposable
{
Action action;
public ActionOnDispose(Action action)
{
this.action = action;
}
public void Dispose()
{
action();
}
}
/// <summary>
/// オブザーバーリストを管理する Observable クラス
///
/// OnAfterSubscribe / OnBeforeSubscribe が利用可能
/// </summary>
/// <typeparam name="T">監視されるデータ</typeparam>
public class Observable<T> : IObservable<T>
{
/// <summary>
/// オブザーバーのリスト
/// </summary>
private List<IObserver<T>> observers = new List<IObserver<T>>();
/// <summary>
/// オブザーバーを登録する
/// </summary>
/// <param name="observer">登録するオブザーバー</param>
/// <returns>登録解除のための IDisposable</returns>
public IDisposable Subscribe(IObserver<T> observer)
{
// observer を登録して
lock (observers) {
observers.Add(observer);
}
OnAfterSubscription(observer);
// Dispose 時に Unsubscribe する IDisposable を返す
return new ActionOnDispose(() => Unsubscribe(observer));
}
/// <summary>
/// オブザーバーの登録を解除する
/// </summary>
/// <param name="observer">登録を解除するオブザーバー</param>
private void Unsubscribe(IObserver<T> observer)
{
OnBeforeUnsubscription(observer);
lock (observers) {
observers.Remove(observer);
}
}
/// <summary>
/// オブザーバーが登録された直後に実行する処理
/// </summary>
/// <param name="observer">登録されたオブザーバー</param>
protected virtual void OnAfterSubscription(IObserver<T> observer) { }
/// <summary>
/// オブザーバーが登録解除される直前に実行する処理
/// </summary>
/// <param name="observer">登録解除されるオブザーバー</param>
protected virtual void OnBeforeUnsubscription(IObserver<T> observer) { }
/// <summary>
/// observer に新しい値を送る
/// </summary>
/// <param name="t">observer に送る値</param>
public void OnNext(T t)
{
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnNext(t);
}
/// <summary>
/// 処理の終了を通知する
/// </summary>
public void OnCompleted()
{
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnCompleted();
}
/// <summary>
/// エラーを通知する
/// </summary>
public void OnError(Exception error)
{
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnError(error);
}
}
/// <summary>
/// MainWindow.xaml の相互作用ロジック
/// </summary>
public partial class MainWindow : Window
{
/// <summary>
/// 文字列値を通知する observable
/// </summary>
Observable<string> observable = new Observable<string>();
public MainWindow()
{
InitializeComponent();
// 同期コンテキスト
var syncContext = System.Threading.SynchronizationContext.Current;
// UI スレッドからの通知を別スレッドで待ち受ける
Task.Run(() => {
// 4回受け取るまで処理する
observable.Take(4).Do(s => {
textBlock1.Text += s + "\n";
}).Wait();
// 以下は4回受け取った後に実行する内容
// UI を操作するため同期コンテキストで実行する
// http://ufcpp.wordpress.com/2012/04/26/%E9%9D%9E%E5%90%8C%E6%9C%9F%E5%87%A6%E7%90%86%E3%81%A8%E3%83%87%E3%82%A3%E3%82%B9%E3%83%91%E3%83%83%E3%83%81%E3%83%A3%E3%83%BC/
syncContext.Post(s => {
// s には Post の第2パラメータの "Finished!" が渡される
textBlock1.Text += (s as string) + "\n";
}, "Finished!");
});
}
private void Button_Click(object sender, RoutedEventArgs e)
{
// 文字列を通知する
observable.OnNext("ok");
}
}
}
ボタンを4回押すと、それぞれ textBlock1 に ok の文字が追加され、
それが終わるとさらに Finished! の文字が追加されます。
ミソとなるのは 144行目からの
LANG:csharp
// 4回受け取るまで処理する
observable.Take(4).Do(s => {
textBlock1.Text += s + "\n";
}).Wait();
の部分で、UI スレッドから
LANG:csharp
// 文字列を通知する
observable.OnNext("ok");
のようにして IObservable<string> observable に送られる文字列を、
LINQ を使って順に処理した上で、処理完了を待ってから次の処理に移ることができます。
上の例では observable.OnNext が UI スレッドから呼ばれ、
Do に渡されたデリゲートも同じスレッドで実行されるため、
Do の中からは UI コントロールに自由にアクセスできますが、
Wait() 後の処理は別スレッドで実行されるため UI コントロールにアクセスするには
同期処理が必要になっています。
** 別解1 [#zf3dd8e5]
Wait() で同期的に終了を待つ代わりに GetAwaiter() により非同期的にクエリを開始すれば、
明示的に別スレッドを開始する必要はありません。内部で自動的にスレッドが起動されます。
この場合、クエリの終了後に行いたい処理があれば Finally() に書けますね。
LANG:csharp
public MainWindow()
{
InitializeComponent();
// 4回受け取るまで処理する
var awaiter = observable.Take(4).Do(s => {
textBlock1.Text += s + "\n";
}).Finally(() => textBlock1.Text += "Finished!\n").GetAwaiter();
// 必要なところで awaiter.Wait(); として終了まで待機できる
}
** 別解2 [#h0dd6b03]
明示的に Subscript を呼ぶこともできますが、その場合呼び出し側から終了を検知できないし、
Finally と Subscribe の順番も気にくわないしで別解1の方がエレガントに思えます。
Subscribe の戻り値を使って好きなときに unsubscribe できるというメリットも
無いわけではないですが・・・代わりに TakeWhile などで判別すれば良い場合が多いような。
ただ実は下で見るように Do を挟むと単純に Subscribe したのに比べてかなりパフォーマンスが落ちるため、
実行速度が問題になる場合には素直に Subscribe で書いた方が良いようです。
LANG:csharp
public MainWindow()
{
InitializeComponent();
// 4回受け取るまで処理する
var subscription = observable
.Take(4)
.Finally(() => textBlock1.Text += "Finished!\n")
.Subscribe(s => {
textBlock1.Text += s + "\n";
});
// 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる
}
** 別解3 [#h0dd6b03]
考えてみると Subscribe するなら Finally ではなく OnCompleted を使うべきですね。
LANG:csharp
public MainWindow()
{
InitializeComponent();
// 4回受け取るまで処理する
var subscription = observable
.Take(4)
.Subscribe(
s => textBlock1.Text += s + "\n",
()=> textBlock1.Text += "Finished!\n"
);
// 以降、好きなときに subscription.Dispose() を呼んで unsubscribe できる
}
* パフォーマンス [#k2d1907e]
** 非同期クエリの開始・終了は重い [#y109d9b5]
非同期クエリの開始・終了処理はかなり重い処理のようです?
下記のような10万回の実行で 0.5 秒もかかりました。
1回あたり 5μ秒となります。
LANG:csharp
var start = DateTime.Now.Ticks;
var observable = new Observable<int>();
for (var i = 0; i < 100000; i++) {
// 1回あたり 5μ秒
observable.Take(1).Do(n => { }).GetAwaiter();
observable.OnNext(0);
}
textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();
** 通知自体もそこそこ重い? [#l7e348d3]
1000万回の実行で1.5秒ほどでした。
1回あたり 150 ns にあたります。
LANG:csharp
var start = DateTime.Now.Ticks;
var sum = 0L;
var observable = new Observable<int>();
var awaiter = observable.Do(i => sum += i }).GetAwaiter();
Task.Run(() => {
for (var i = 0L; i < 1000000; i++) {
observable.OnNext(1);
}
observable.OnCompleted();
});
awaiter.Wait();
textBlock1.Text += "\nAnswer=" + sum.ToString() + "\n" + (DateTime.Now.Ticks - start).ToString();
*** 普通に実行した場合との差 [#j59761d3]
下記のように普通に for を回すだけだと 50ms で終了しますので、
1回転あたり 5ns しかかからず、
上記の 150ns というのがほぼすべて Rx のディスパッチ処理にかかっていることが分かります。
LANG:csharp
var start = DateTime.Now.Ticks;
var sum = 0L;
Task.Run(() => {
for (var i = 0L; i < 10000000; i++) {
(new Action(() => sum += 1)).Invoke(); // 5ns
}
}).Wait();
textBlock1.Text += "\n" + (DateTime.Now.Ticks - start).ToString();
ちなみにラムダ式の Invoke() で書いている部分を単に sum += 1 と書き直すと 3 ns で済みます。
** どこが遅いのか [#qdaf0f1a]
処理の流れとしては、
LANG:csharp
observable.OnNext(t);
により、
LANG:csharp
foreach (var observer in observers.ToArray())
if (observers.Contains(observer))
observer.OnNext(t);
が実行され、observer.OnNext(t) によりラムダ式が実行されます。
あちこち削って試してみると、
- observers.ToArray() に 20ns
- if (observers.Contains(observer)) に 30ns
- foreach に 40ns
- observers[i] の参照に 5ns
かかります。
さらに、
LANG:csharp
var awaiter = observable.Do(i => sum += i).GetAwaiter();
の代わりに
LANG:csharp
observable.Subscribe(i => sum += i);
とすると、40ns くらい早くなるので、Do がそれだけ時間がかかっていることになります。
全部除くと 15ns となって、元の 5ns に近づきます。
残りの 10ns はラムダ式が Observer<T> にラップされていることによるのではないかと思いますが、調べ切れていません。
** System.Reactive.Subjects.Subject<> を使う [#t3f25d73]
自前の Observable<T> を定義することをやめて、
Rx は遅いという記事はネット上でもたくさん見付かるようですね。
その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。
http://okazuki.hatenablog.com/entry/20111214/1323828027
確かに、自前の Observable<T> を定義することをやめて、
System.Reactive.Subjects.Subject<> を用いると上記のオーバーヘッドを劇的に減らせます。
LANG:csharp
var observable = new Observable<int>();
としていたところを
LANG:csharp
var observable = new System.Reactive.Subjects.Subject<int>();
として、Do ではなく Subscribe を使ったところ 17ns 程度となって、
非常に高速に動作します。自分で実装したコードに比べてここまで早い理由はよく分かりませんが、
迷わず Subject<> を使うべきであることが分かります。
*** スケジューラーの指定 [#u944483c]
さすがにかかりすぎなので、スケジューラの指定などでもっと軽い処理を行うことができないか
試してみました。
上記のように普通に実行して 3 秒で終わった処理は、
observable.Do を observable.ObserveOn(System.Reactive.Concurrency.DefaultScheduler.Instance).Do
としたところ、6.2 秒かかるようになりました。
observable.Do を observable.ObserveOn(System.Reactive.Concurrency.ImmediateScheduler.Instance).Do
としたところ、18 秒かかるようになりました。
observable.Do を observable.ObserveOn(System.Reactive.Concurrency.CurrentThreadScheduler.Instance).Do
としたところ、40 秒かかるようになりました。
軽くなるような指定の仕方、まだ見つけられずにいます。
// ** Subject を使う [#bad6086f]
//
// Rx は遅いという記事はネット上でもたくさん見付かるようですね。
//
// その中で見つけたのが Subject<> を使った push 配信はそこそこ早いという記事。
//
// http://okazuki.hatenablog.com/entry/20111214/1323828027
//
// 確かに上記を次のように直すだけで
//
// LANG:csharp
// // var observable = new Observable<int>();
// var observable = new System.Reactive.Subjects.Subject<int>();
//
* コメント・質問 [#m53a045c]
#article_kcaptcha
Counter: 6422 (from 2010/06/03),
today: 1,
yesterday: 1