2018-8-14
RxJSメモ
分岐パターン
n 対 n
単一の Observable に複数 subscribe した場合、subscribe したタイミングで Stream の複製が割り当てられる。
で、何が起こるかというと、複製された Stream ではまた値が最初からになる。
RxJS を学ぼう #4 – COLD と HOT について学ぶ / ConnectableObservable – PSYENCE:MEDIA
import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';
const clock$ = Rx.interval(1000);
clock$.subscribe(value => console.log(value));
setTimeout(() => {
    clock$.pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));
}, 2500);
実行結果
0
1
2
0
3
10
4
20
5
30
1 対 n(HOT な Observable)
前述の通り COLD な状態では、複数subscribeするとストリームの複製がそれぞれ割り当てられる。
publishを行う事で、ConnectableObservableに変化。その後、connectすることでデータを流しはじめる。この状態は HOT な状態で、複数 subscribe された場合でも同じストリームが利用されるので同じデータが流される。
※ConnectableObservable へ明示的にダウンキャストしないと現状エラーが発生する。これに関しては issue として登録されている。
import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';
const clock$ = Rx.interval(1000).pipe(
    Operators.publish()
) as Rx.ConnectableObservable<number>;
clock$.subscribe(value => console.log(value));
setTimeout(() => {
    clock$.pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));
}, 2500);
clock$.connect();
実行結果
0
1
2
20
3
30
4
40
5
50
1 対 n(Subject)
1 対 n はSubjectを利用することでも可能。
RxJS を学ぼう #5 – Subject について学ぶ / Observable × Observer – PSYENCE:MEDIA
import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';
const clock$ = Rx.interval(1000);
const subject = new Rx.Subject<number>();
clock$.subscribe(subject);
subject.subscribe(value => console.log(value));
subject
    .pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));
実行結果
0
0
1
10
2
20
3
30
4
40
5
50
任意タイミングでデータを流す
Subjectでnextを利用することで可能。
Angular ではこれ別コンポーネントへメッセージを渡す場合に使う。
import * as Rx from 'rxjs';
class ToastService {
    _subject: Rx.Subject<string>;
    get observable() {
        return this._subject.asObservable();
    }
    constructor() {
        this._subject = new Rx.Subject<string>();
    }
    launchToast(message: string) {
        this._subject.next(message);
    }
}
class FirstComponent {
    constructor(private toastService: ToastService) {}
    showMessage(message: string) {
        this.toastService.launchToast(message);
    }
}
class SecondComponent {
    constructor(private toastService: ToastService) {
        this.toastService.observable.subscribe(message => console.log(message));
    }
}
const toastService = new ToastService();
const first = new FirstComponent(toastService);
new SecondComponent(toastService);
first.showMessage('toast message');
実行結果
toast message