Theme
SD MILIEU

2018-8-14

RxJSメモ

分岐パターン

n 対 n

単一の Observable に複数 subscribe した場合、subscribe したタイミングで Stream の複製が割り当てられる。

で、何が起こるかというと、複製された Stream ではまた値が最初からになる。

RxJS を学ぼう #4 – COLD と HOT について学ぶ / ConnectableObservable – PSYENCE:MEDIA

import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';

const clock$ = Rx.interval(1000);

clock$.subscribe(value => console.log(value));

setTimeout(() => {
    clock$.pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));
}, 2500);

実行結果

0
1
2
0
3
10
4
20
5
30

1 対 n(HOT な Observable)

前述の通り COLD な状態では、複数subscribeするとストリームの複製がそれぞれ割り当てられる。

publishを行う事で、ConnectableObservableに変化。その後、connectすることでデータを流しはじめる。この状態は HOT な状態で、複数 subscribe された場合でも同じストリームが利用されるので同じデータが流される。

※ConnectableObservable へ明示的にダウンキャストしないと現状エラーが発生する。これに関しては issue として登録されている。

Pipe operator cannot infer return type as ConnectableObservable · Issue #2972 · ReactiveX/rxjs · GitHub

import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';

const clock$ = Rx.interval(1000).pipe(
    Operators.publish()
) as Rx.ConnectableObservable<number>;

clock$.subscribe(value => console.log(value));

setTimeout(() => {
    clock$.pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));
}, 2500);

clock$.connect();

実行結果

0
1
2
20
3
30
4
40
5
50

1 対 n(Subject)

1 対 n はSubjectを利用することでも可能。

RxJS を学ぼう #5 – Subject について学ぶ / Observable × Observer – PSYENCE:MEDIA

import * as Rx from 'rxjs';
import * as Operators from 'rxjs/operators';

const clock$ = Rx.interval(1000);
const subject = new Rx.Subject<number>();

clock$.subscribe(subject);
subject.subscribe(value => console.log(value));
subject
    .pipe(
        Operators.map(value => value * 10)
    )
    .subscribe(value => console.log(value));

実行結果

0
0
1
10
2
20
3
30
4
40
5
50

任意タイミングでデータを流す

Subjectnextを利用することで可能。

Angular ではこれ別コンポーネントへメッセージを渡す場合に使う。

import * as Rx from 'rxjs';

class ToastService {
    _subject: Rx.Subject<string>;

    get observable() {
        return this._subject.asObservable();
    }

    constructor() {
        this._subject = new Rx.Subject<string>();
    }

    launchToast(message: string) {
        this._subject.next(message);
    }
}

class FirstComponent {
    constructor(private toastService: ToastService) {}

    showMessage(message: string) {
        this.toastService.launchToast(message);
    }
}

class SecondComponent {
    constructor(private toastService: ToastService) {
        this.toastService.observable.subscribe(message => console.log(message));
    }
}

const toastService = new ToastService();
const first = new FirstComponent(toastService);
new SecondComponent(toastService);

first.showMessage('toast message');

実行結果

toast message