こちらはセクション2です。このトピックは3パートで構成されています:
- Understanding the changes
- Disposing subscriptions
- Miscellaneous changes
です。
この部分はAndroid開発者として大変重要な部分ですが、すごくわかりずらい部分です。ジェダイ・マスターのKarnokはwikiでこれをわかりやすく説明しています:
RxJava 1.xでは、インタフェースrx.Subscriptionがストリームとリソースのライフサイクル管理を担当しました。つまり、シーケンスの登録を解除し、スケジュールされたタスクなどの一般的なリソースを解放します。Reactive-Streams仕様では、ソースとコンシューマの間の相互作用点を指定するためにこの名前が使用されました。org.reactivestreams.Subscriptionは、上流からの正値をリクエストし、シーケンスを取り消すことができます。
Publisher.subscribe(Subscriber) => Subscription
この定義だけでは、何も変わっていないように見えますが、こちらをご覧ください:
Publisher.subscribe(Subscriber) => Subscription
私は最初から指摘していました。
The use of => vs = was intentional。もう一度source code for Publisher’s subscribe method を読んでください、return タイプのvoid vizに気付いてるはずです。CompositeSubscription (which you can then conveniently dispose of onStop/onDestroy).
interface Publisher<T> {
// return type void (not Subscription like before)
void subscribe(Subscriber<? super T> s);
}
ここでは再びKarnokの出番です:
Reactive-Streamsの基本インターフェースであるため、org.reactivestreams.Publisherはsubscribe()メソッドをvoidとして定義し、Flowable.subscribe(Subscriber)はSubscription(またはDisposable)をReturnしなくなりました。他のbase reactive typesも、それぞれのrespective subscriber typesとともにこのシグネチャに従います。
もう一度declarationsを見てください、
// RxJava specific constructs
// Observable implements “ObservableSource”
interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
// Single implements SingleSource
interface SingleSource<T> {
void subscribe(SingleObserver<? super T> observer);
}
interface CompletableSource {
void subscribe(CompletableObserver observer);
}
interface MaybeSource<T> {
void subscribe(MaybeObserver<? super T> observer);
}
それら全てにreturn type voidがあることに注意してください。だから、どうすればそのサブスクリプションを延期することができますか(メタファー:あなたが責任ある市民のようにそれを正しく取り消すか、処分するかもしれません)?
the Subscriber’s onSubscribeの方法を見てみましょう。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
// Subscriptions are additionally cool cause they have:
// s.request(n) -> request data
// s.cancel() -> cancel this connection
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
これで、onSubscribeコールバックのパラメータとしてSubscriptionクラスが指定されました。したがって、OnSubscribeメソッドでは、サブスクリプションを保持しているため、onSubscribeコールバック内でサブスクリプションを簡単に破棄することができます。
これは、実際にはサブスクライバのインタフェースを軽量化するため、実際にはかなり変更を余儀なくされました。 RxJava 1 landで、Subscribersは、多くの内部状態の処理に対処しなければならないため、より重いものでした。
ちょっと待てください! これは全く便利ではないです。私は以前のように全てのデータをCompositeSubscriptionに押し出すでしょう。その方が楽です。しかし、それはReactive Streams spec仕様の判断です。
嬉しいことの一つはRxJavaの管理者たちがこの状況(trade-off)に気づいてくれました。そして改善もしてくれました。でもその前に定義について少し話したいと思います。昔RxJava 1 の中でSubscriptionという名前をつけていましたが、今はDisposableに変わります。
どうしてSubscriptionという名前をそのまま使わないのですか?
- Reactive Streams仕様にはすでにこの名前が使われています、RxJava 2の管理者は仕様を遵守していることを覚えておく必要があります。 私たちは、Rxサブスクリプションと他のリアクティブ・ストリーム仕様のライブラリを併用する機能が混乱しないようにしたいです。
- 2点目はRxJava 1が持っている幾つかの機能や便利さをキープしたいです。
今からはCompositeDisposableを使って、Disposablesをそこに付け加えた方がいいと思います。この手段は実際に以前使ったCompositeSubscription に似ています。ここでもう一度最初の質問に戻りましょう:一体私はどやってDisposableを把握できますか。
Disposablesを把握する
コールバックをlambdaの形で直接追加する場合、これは問題にはなりません。ほとんどの可視的なソースはsubscriber objectで提供されていないときにsubscribeメソッド呼び出しでDisposableを返します。
Flowable.subscribe(Subscriber)
// void return type
Flowable.subscribe(nextEvent -> {}, error -> {}, () -> {})
// return Disposable so we’re good
この理由で下記サンプルコードは全然問題なく普通に動きます:
disposable =
myAwesomeFlowable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(event -> {
// onNext
}, throwable -> {
// onError
}, () -> {
// onComplete
});
compositeDisposable.add(disposable);
でももし私がコードを少しだけ書き変えれば、下記となります:
disposable =
myAwesomeFlowable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<TextViewTextChangeEvent>() {
@Override
public void onSubscribe(Subscription subscription) {
}
@Override
public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
// ^ THIS IS WRONG. Won’t work
// compositeDisposable.add(disposable);
上記のコードはコンパイルされません。もしSubscriber object を通したいなら (FlowableSubscriber、ObservableSource 、 Observerなど) 、この手段ではできません。
この方法を使うRxJava 1 コードユーザーはたくさんいますから、管理者たちがwikiにsubscribeWithという“コツ”を追加しました。
Reactive-Streams仕様のため、Publisher.subscribe returns void とパターン自体は2.0で作業できなくなります。 これを解決するために、method E subscribeWith(Eサブスクライバ)が、入力されたsubscriber/observerをそのまま返す各基本リアクティブクラスに追加されました。
E subscribeWith(E subscriber)
もしあなたが親密にわたしのやり方をフォローしているなら、疑問に思うかもしれません。待って、それはreturn a Disposableをしていない! それはなぜremotelyで便利なの?
さて、あなたがpassしたSubscriberはsubscribeWithであなたに送り返されていると言われています。しかし、Subscriber自身がDisposable interfaceを「実装」している場合はどうでしょうか? あなたがDisposableSubscriberを持っていれば、それをDisposableとして扱い、CompositeDisposableに引き続きSubscriberとして使うことができます。それは典型的にあなたが採用したいパターンです。これらの技術を明確にするためのコードは次のとおりす。
disposable =
myAwesomeFlowable
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new getDisposableSubscriber<TextViewTextChangeEvent>() {
@Override
public void onNext(TextViewTextChangeEvent event) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() { }
});
compositeDisposable.add(disposable);
DisposableSubscriber以外に、Disposableを実装するResourceSubscriberもあります。 Disposableインターフェイスを実装していないDefaultSubscriberもありますので、subscribeWithで使用することはできません。(使ってもdisposableできません)
DisposableSubscriberとResourceSubscriberの両方が同じことをするようです。 なぜ両方必要ですか。
元の1.xサブスクライバは、サブスクリプションを取ることができ、ライフサイクルが終了したとき、またはサブスクライバがサブスクライブされなかったときに特定のサブスクライバが必要とした追加のリソースを「処分」できました。
2.x Subscriberは外部から宣言されたインタフェースなので、古い機能は別の抽象クラス「ResourceSubscriber」を介して実装する必要がありました。主な違いは、使い捨てリソースを作成して関連付けることができ、実装するonError()メソッドとonComplete()メソッドの中からそれらを一緒に処分できることです。ドキュメント内の例を見てください。
こちらに例をご覧ください:within the docs
to .clear あるいは to .dispose
CompositeDisposableには、もはやsubscribe解除の要求はありません。それは、disposeのために名前が変更されました。とにかくそれらのどちらかを使用したくないのです。clearメソッドが残っており、使用する可能性の高いメソッドです。
どこが違いますか?
次のセクションはさまざまな変化についてご説明いただきたいと思います。最後はDonn Felker さんと David Karnokさんに感謝の気持ちを伝えたいです。この二人はこの記事をレビューしてくれました、ありがとうございます。特に David さん、幾つかの思い違いを訂正して頂き、ありがとうございます。
最初はblog.kaush.co で出版されました(2017/6/21)
タイトル:RxJava 1 -> RxJava 2 (Disposing Subscriptions)
作者:Kaushik Gopal
原文URL:https://tech.instacart.com/rxjava-1-rxjava-2-disposing-subscriptions-48988798eccf