鍍金池/ 教程/ Android/ Subject
調度器 Scheduler
Empty/Never/Throw
Replay
這個(gè)頁(yè)面展示了創(chuàng )建Observable的各種方法。
ObserveOn
ReactiveX
TimeInterval
Window
本頁(yè)展示的操作符用于對整個(gè)序列執行算法操作或其它操作,由于這些操作必須等待數據發(fā)射完成(通常也必須緩存這些數據),它們對于非常長(cháng)
IgnoreElements
Distinct
Last
Start
And/Then/When
Switch
創(chuàng )建操作
Materialize/Dematerialize
CombineLatest
Catch
實(shí)現自己的操作符
StringObservable
Map
ConnectableObservable
Using
Take
BlockingObservable
TakeLast
Defer
RxJavaSchedulersHook
First
FlatMap
這個(gè)頁(yè)面的操作符可用于根據條件發(fā)射或變換Observables,或者對它們做布爾運算:
Do
Repeat
Serialize
這個(gè)頁(yè)面展示的操作符可用于過(guò)濾和選擇Observable發(fā)射的數據序列。
這個(gè)頁(yè)面列出了很多用于Observable的輔助操作符
Single
Retry
從錯誤中恢復的技術(shù)
Sample
Merge
算術(shù)和聚合操作
Range
Timestamp
RxJava Issues
From
Subscribe
Subject
Delay
Skip
SubscribeOn
Filter
按字母順序排列的全部操作符列表
Timeout
Scan
onError
Zip
RxJava文檔和教程
Publish
ElementAt
第一個(gè)例子
SkipLast
Just
Timer
Debounce
GroupBy
條件和布爾操作
這個(gè)頁(yè)面展示了可用于對Observable發(fā)射的數據執行變換操作的各種操作符。
Introduction
rxjava-async
介紹響應式編程
這個(gè)頁(yè)面展示的操作符可用于組合多個(gè)Observables。
ReactiveX
Connect
操作符分類(lèi)
StartWith
Interval
Join
To
Buffer
RefCount
介紹
Observable

Subject

Subject可以看成是一個(gè)橋梁或者代理,在某些ReactiveX實(shí)現中(如RxJava),它同時(shí)充當了Observer和Observable的角色。因為它是一個(gè)Observer,它可以訂閱一個(gè)或多個(gè)Observable;又因為它是一個(gè)Observable,它可以轉發(fā)它收到(Observe)的數據,也可以發(fā)射新的數據。

由于一個(gè)Subject訂閱一個(gè)Observable,它可以觸發(fā)這個(gè)Observable開(kāi)始發(fā)射數據(如果那個(gè)Observable是"冷"的--就是說(shuō),它等待有訂閱才開(kāi)始發(fā)射數據)。因此有這樣的效果,Subject可以把原來(lái)那個(gè)"冷"的Observable變成"熱"的。

Subject的種類(lèi)

針對不同的場(chǎng)景一共有四種類(lèi)型的Subject。他們并不是在所有的實(shí)現中全部都存在,而且一些實(shí)現使用其它的命名約定(例如,在RxScala中Subject被稱(chēng)作PublishSubject)。

AsyncSubject

一個(gè)AsyncSubject只在原始Observable完成后,發(fā)射來(lái)自原始Observable的最后一個(gè)值。(如果原始Observable沒(méi)有發(fā)射任何值,AsyncObject也不發(fā)射任何值)它會(huì )把這最后一個(gè)值發(fā)射給任何后續的觀(guān)察者。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.png" alt="" />

然而,如果原始的Observable因為發(fā)生了錯誤而終止,AsyncSubject將不會(huì )發(fā)射任何數據,只是簡(jiǎn)單的向前傳遞這個(gè)錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.AsyncSubject.e.png" alt="" />

BehaviorSubject

當觀(guān)察者訂閱BehaviorSubject時(shí),它開(kāi)始發(fā)射原始Observable最近發(fā)射的數據(如果此時(shí)還沒(méi)有收到任何數據,它會(huì )發(fā)射一個(gè)默認值),然后繼續發(fā)射其它任何來(lái)自原始Observable的數據。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.png" alt="" />

然而,如果原始的Observable因為發(fā)生了一個(gè)錯誤而終止,BehaviorSubject將不會(huì )發(fā)射任何數據,只是簡(jiǎn)單的向前傳遞這個(gè)錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.BehaviorSubject.e.png" alt="" />

PublishSubject

PublishSubject只會(huì )把在訂閱發(fā)生的時(shí)間點(diǎn)之后來(lái)自原始Observable的數據發(fā)射給觀(guān)察者。需要注意的是,PublishSubject可能會(huì )一創(chuàng )建完成就立刻開(kāi)始發(fā)射數據(除非你可以阻止它發(fā)生),因此這里有一個(gè)風(fēng)險:在Subject被創(chuàng )建后到有觀(guān)察者訂閱它之前這個(gè)時(shí)間段內,一個(gè)或多個(gè)數據可能會(huì )丟失。如果要確保來(lái)自原始Observable的所有數據都被分發(fā),你需要這樣做:或者使用Create創(chuàng )建那個(gè)Observable以便手動(dòng)給它引入"冷"Observable的行為(當所有觀(guān)察者都已經(jīng)訂閱時(shí)才開(kāi)始發(fā)射數據),或者改用ReplaySubject。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.png" alt="" />

如果原始的Observable因為發(fā)生了一個(gè)錯誤而終止,PublishSubject將不會(huì )發(fā)射任何數據,只是簡(jiǎn)單的向前傳遞這個(gè)錯誤通知。 http://wiki.jikexueyuan.com/project/rx-docs/images/S.PublishSubject.e.png" alt="" />

ReplaySubject

ReplaySubject會(huì )發(fā)射所有來(lái)自原始Observable的數據給觀(guān)察者,無(wú)論它們是何時(shí)訂閱的。也有其它版本的ReplaySubject,在重放緩存增長(cháng)到一定大小的時(shí)候或過(guò)了一段時(shí)間后會(huì )丟棄舊的數據(原始Observable發(fā)射的)。

如果你把ReplaySubject當作一個(gè)觀(guān)察者使用,注意不要從多個(gè)線(xiàn)程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(shí)(非順序)調用,這會(huì )違反Observable協(xié)議,給Subject的結果增加了不確定性。

http://wiki.jikexueyuan.com/project/rx-docs/images/S.ReplaySubject.png" alt="" />

RxJava的對應類(lèi)

假設你有一個(gè)Subject,你想把它傳遞給其它的代理或者暴露它的Subscriber接口,你可以調用它的asObservable方法,這個(gè)方法返回一個(gè)Observable。具體使用方法可以參考Javadoc文檔。

串行化

如果你把 Subject 當作一個(gè) Subscriber 使用,注意不要從多個(gè)線(xiàn)程中調用它的onNext方法(包括其它的on系列方法),這可能導致同時(shí)(非順序)調用,這會(huì )違反Observable協(xié)議,給Subject的結果增加了不確定性。

要避免此類(lèi)問(wèn)題,你可以將 Subject 轉換為一個(gè) SerializedSubject ,類(lèi)似于這樣:

mySafeSubject = new SerializedSubject( myUnsafeSubject );
上一篇:And/Then/When下一篇:Distinct