【Programming】RxJava リアクティブプログラミング vol.2 / RxJavaの概要 by promari

View this thread on steempeak.com
· @promari · (edited)
$20.27
【Programming】RxJava リアクティブプログラミング vol.2 / RxJavaの概要
前回はリアクティブプログラムミングの概要についてまとめました(記事は[こちら](https://steemit.com/promari/@promari/programming-rxjava-vol-1))。今回は[RxJava](https://github.com/ReactiveX/RxJava)の概要をまとめてみます。

### 1.3 RxJava

#### 1.3.1 概要

![eqkfqyeubf.png](https://img.esteem.ws/eqkfqyeubf.png)

- Javaでリアクティブプログラミングを行うためのライブラリ
- [RxJava](https://github.com/ReactiveX/RxJava)は、もともとは2009年にMicrosoftで .NET Frameworkの実験的なライブラリ「Reactive Extensions」(略して「Rx」)として公開され、2012年にオープンソース化されたものを、後にNetflixがJavaに移植しオープンソースとして公開した。
- Reactive Extensionsを扱うライブラリは[ReactiveX](http://reactivex.io/)としてオープンソースプロジェクト化し、Javaや.NETだけでなくJavaScriptやSwiftなど様々なプログラミング言語に対応したライブラリを提供している
- バージョン2.0よりReactive Streamsの仕様を実装している。2.0よりReactive StreamsのAPIに依存。
- デザインパターンの1つであるObserverパターンをうまく拡張している。Observerパターンは、監視対象のオブジェクトの状態変化に対するデザインパターンで、状態が変化するとそれを観察しているオブジェクトに対し変化があったことを知らせる構成。このパターンの特徴を生かし、特にデータを生産する側とデータを消費する側に分けることで、無理なくデータストリームを扱うことができる作りになっている。

#### 1.3.2 バージョンの違い

- バージョン1からバージョン2に移行する際に単にパッケージ名やクラス名を変えるだけではなく、それらのAPI周りの変更も必要に
なる

|バージョン|パッケージ|
|:--|:--|
|1.x|rx|
|2.x|io.reactivex|

- 参考サイト
   - [RxJava 1.x → 2.x 移行ガイド](https://hydrakecat.hatenablog.jp/entry/2018/06/30/RxJava_1.x_%E2%86%92_2.x_%E7%A7%BB%E8%A1%8C%E3%82%AC%E3%82%A4%E3%83%89)
   - [What's different in 2.0](https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#maven-address-and-base-package)

#### 1.3.3 RxJavaの仕組み

- RxJavaの仕組みはデータを生産し通知する生産者と通知されたデータを受け取り処理を行う消費者の構成で成り立つ。
- RxJavaではこの生産者と消費者の関係が大きく分けて2つあり、1つはReactive Streamsを対応しているFlowableとSubscriber、もう1つはReactive Streamsの対応をしておらずバックプレッシャー(過去のメッセージも取得できること)の機能がないObservableとObserverの構成で成り立つ
- FlowableはObservableの進化系。BackPressureを調整して、onNestででてくるデータのスピード調整などができる。

|Reactive Streams|生産者(-able; 購読される)|消費者(-er; 購読する)|生産者&消費者(-erであり、-ableでもある)|
|:--|:--|:--|:--|
|Reactive Streams対応あり(バックプレッシャー機能あり)|Flowable|Subscriber|Processor|
|Reactive Streams対応なし(バックプレッシャー機能なし)|Observable|Observer|Subject|

- Flowable/Subscriber
   - 生産者であるFlowableによる購読開始(onSubscribe)、データ通知(onNext)、エラー(onError)、完了(onComplete)の4つの通知を行い、消費者であるSubscriberによって各通知を受け取った際の処理を行う。
   - Subscriptionを通してデータ数のリクエストや購読の解除を行う。
- Observable/Observer
   - FlowableとSubscriberの構成とほぼ同じで、生産者であるObservableからの購読開始(onSubscribe)、データ通知(onNext)、エラー(onError)、完了(onComplete)の4つの通知をObserverで受け取る
   - 通知するデータ数の制御を行うバックプレッシャーの機能がないため、データ数のリクエストを行わない。そのため、Subscriptionは使わず、Disposableという購読解除のメソッドを持つインターフェースを用いる。
   - Disposableは購読解除を行うため、購読を解除するdispose()と、購読を解除している場合はtrueを、解除していない場合はfalseを返すisDisposed()メソッドを持つ
   - ObservableとObserver間でデータをやり取りをする場合は、FlowableとSubscriber 間のようなデータ数のリクエストは行わず、データが生成されるとすぐにObserverに通知される

![kt30zjz878.png](https://img.esteem.ws/kt30zjz878.png)
#### Source:[Reactive Streams And Microservices - A Case Study](http://blog.avenuecode.com/reactive-streams-and-microservices-a-case-study)

#### 1.3.4 オペレータ

- RxJavaでは、Publisher(Flowable/Observable)からSubscriber(Subscriber/Observerにデータが通知される間にSubscriberがデータを利用しやすい形に整形、変換することができる。
- 整形されたデータは再びFlowable/Observableなデータとして返されるため、メソッドチェインする形で段階的にデータを整形することが可能。
- こうしたデータを生成したり、変換したり、フィルターをかけたりできるメソッドのことをRxJavaではオペレータと呼ぶ。

![ocyxiwg9tj.png](https://img.esteem.ws/ocyxiwg9tj.png)
#### Source:[RxJavaリアクティブプログラミング](https://www.amazon.co.jp/dp/B06XGYSHCN/)

#### 1.3.5 RxJava Examples

- RxJavaの大まかな流れ
   - 1.Observableを作る
   - 2.filterやmapなどのOperatorを使って値を加工する
   - 3.Observerを使ってObservableをsubscribeする

- Flowable(Reactive Streams 対応)を使ったサンプル

   - Sample1 文字列を配信するObservableプログラム

```
public void basicExample() {

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("Hello");
            e.onNext("Welcome");
            e.onNext("This is your first RxJava example");
            e.onComplete();
        }
    });

    Observer<String> observer = new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            LOGGER.info("observer subscribed to observable - on subscribe");
        }

        @Override
        public void onNext(String value) {
            LOGGER.info("observer - onNext " + value);
        }

        @Override
        public void onError(Throwable e) {
            LOGGER.info("observer - onError " + e.toString());
        }

        @Override
        public void onComplete() {
            LOGGER.info("observer - on complete");
        }
    };

    observable.subscribe(observer);
}
```

   - Sample2 Operatorを使って値を加工するプログラム

```
Observable.from(new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer i) {
            return (i % 2) == 0;
        }
    })
    .map(new Func1<Integer, Integer>() {
        @Override
        public Integer call(Integer i) {
            return i * 10;
        }
    })
    .subscribe(new Observer<Integer>() {
        @Override
        public void onNext(Integer integer) {
            Log.d("Hoge", integer.toString());
        }

        @Override
        public void onCompleted() {
            Log.d("Hoge", "completed");
        }

        @Override
        public void onError(Throwable e) {}
    });
```

   - Sample3 あいさつの言葉を通知するFlowableプログラム

```
public static void main(String[] args) throws Exception {
    // あいさつの言葉を通知するFlowableの生成
    Flowable < String > flowable =
        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter)
            throws Exception {
                String[] datas = {
                    "Hello, World!",
                    "こんにちは、世界!"
                };
                for (String data: datas) {
                    // 購読解除されている場合は処理をやめる
                    if (emitter.isCancelled()) {
                        return;
                    }
                    // データを通知する
                    emitter.onNext(data);
                }
                // 完了したことを通知する
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER); // 超過したデータはバッファする
    flowable
        // Subscriberの処理を別スレッドで行うようにする
        .observeOn(Schedulers.computation())
        // 購読する
        .subscribe(new Subscriber<String>() {
            // データ数のリクエストおよび購読の解除を行うオブジェクト
            private Subscription subscription;
            // 購読が開始された際の処理
            @Override
            public void onSubscribe(Subscription subscription) {
                // SubscriptionをSubscriber内で保持する
                this.subscription = subscription;
                // 受け取るデータ数をリクエストする
                this.subscription.request(1 L);
            }
            // データを受け取った際の処理
            @Override
            public void onNext(String data) {
                // 実行しているスレッド名の取得
                String threadName = Thread.currentThread().getName();
                // 受け取ったデータを出力する
                System.out.println(threadName + ": " + data);
                // 次に受け取るデータ数をリクエストする
                this.subscription.request(1 L);
            }
            // 完了を通知された際の処理
            @Override
            public void onComplete() {
                // 実行しているスレッド名の取得
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + ": 完了しました");
            }
            // エラーを通知された際の処理
            @Override
            public void onError(Throwable error) {
                error.printStackTrace();
            }
        });
    // しばらく待つ
    Thread.sleep(500 L);
}
```

#### 1.3.6 副作用を発生させる処理

- 副作用を発生させる処理とは、オブジェクトの状態を変更するなどして、処理の外部からでも参照可能なオブジェクトに対して何らかの変化を加えることや、ファイルやデータベースの中身を変えるようなことを指す。
- 副作用を発生させないことは、複数スレッドから共有されるオブジェクトがないことになり、スレッドセーフを担保することができる
- データを通知してから消費者に受け取られるまでの間は副作用の発生を避ける作りとする。
- RxJavaでは基本的に副作用を発生させるような処理を行うのは、メソッドチェインの途中ではなく、最終的にデータを受け取り処理を行う消費者側で行うこと。

次回は「[【Programming】RxJava リアクティブプログラミング vol.3 / RxJavaの構成~前編~](https://steemit.com/promari/@promari/programming-rxjava-vol-3-rxjava)」についてまとめてみます。

![t0r78hqbeu.png](https://img.esteem.ws/t0r78hqbeu.png)
written by [tamito0201](https://steemit.com/@tamito0201/)

プログラミングとのご縁結びなら[プロマリ](https://www.programming-mariage.jp/)へ。

オンラインプログラミング学習スクールの[プロマリ](https://www.programming-mariage.jp/)は、プログラミングの初学者の皆様を応援しています。プログラミング講師と一緒に面白いアプリを作りませんか。

<a href="https://www.programming-mariage.jp">![btpb5hmlur.png](https://img.esteem.ws/btpb5hmlur.png)</a>

The programming school "[Promari](https://www.programming-mariage.jp/)" will help you learn programming. "[Promari](https://www.programming-mariage.jp/)" is supporting the first scholars of programming. Let's develop an application with our programming instructor.
👍  , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , and 1636 others
properties (23)
post_id72,374,309
authorpromari
permlinkprogramming-rxjava-vol-2-rxjava
categorypromari
json_metadata{"links":["https:\/\/steemit.com\/promari\/@promari\/programming-rxjava-vol-1","https:\/\/github.com\/ReactiveX\/RxJava","http:\/\/reactivex.io\/","https:\/\/hydrakecat.hatenablog.jp\/entry\/2018\/06\/30\/RxJava_1.x_%E2%86%92_2.x_%E7%A7%BB%E8%A1%8C%E3%82%AC%E3%82%A4%E3%83%89","https:\/\/github.com\/ReactiveX\/RxJava\/wiki\/What's-different-in-2.0#maven-address-and-base-package","http:\/\/blog.avenuecode.com\/reactive-streams-and-microservices-a-case-study","https:\/\/www.amazon.co.jp\/dp\/B06XGYSHCN\/","https:\/\/steemit.com\/promari\/@promari\/programming-rxjava-vol-3-rxjava","https:\/\/steemit.com\/@tamito0201\/","https:\/\/www.programming-mariage.jp\/","https:\/\/www.programming-mariage.jp"],"image":["https:\/\/img.esteem.ws\/eqkfqyeubf.png","https:\/\/img.esteem.ws\/kt30zjz878.png","https:\/\/img.esteem.ws\/ocyxiwg9tj.png","https:\/\/img.esteem.ws\/t0r78hqbeu.png","https:\/\/img.esteem.ws\/btpb5hmlur.png"],"tags":["promari","programming","japanese","blog","java"],"app":"steemit\/0.1","format":"markdown","community":"esteem.app"}
created2019-04-01 11:53:21
last_update2019-04-05 21:48:24
depth0
children2
net_rshares30,342,080,268,650
last_payout2019-04-08 11:53:21
cashout_time1969-12-31 23:59:59
total_payout_value15.421 SBD
curator_payout_value4.850 SBD
pending_payout_value0.000 SBD
promoted0.000 SBD
body_length8,613
author_reputation6,025,595,860,743
root_title"【Programming】RxJava リアクティブプログラミング vol.2 / RxJavaの概要"
beneficiaries
0.
accountesteemapp
weight1,000
max_accepted_payout1,000,000.000 SBD
percent_steem_dollars10,000
author_curate_reward""
vote details (1700)
@steemitboard ·
Congratulations @promari! You have completed the following achievement on the Steem blockchain and have been rewarded with new badge(s) :

<table><tr><td>https://steemitimages.com/60x70/http://steemitboard.com/@promari/voted.png?201904011928</td><td>You received more than 8000 upvotes. Your next target is to reach 9000 upvotes.</td></tr>
</table>

<sub>_You can view [your badges on your Steem Board](https://steemitboard.com/@promari) and compare to others on the [Steem Ranking](http://steemitboard.com/ranking/index.php?name=promari)_</sub>
<sub>_If you no longer want to receive notifications, reply to this comment with the word_ `STOP`</sub>



###### [Vote for @Steemitboard as a witness](https://v2.steemconnect.com/sign/account-witness-vote?witness=steemitboard&approve=1) to get one more award and increased upvotes!
properties (22)
post_id72,397,170
authorsteemitboard
permlinksteemitboard-notify-promari-20190401t205340000z
categorypromari
json_metadata{"image":["https:\/\/steemitboard.com\/img\/notify.png"]}
created2019-04-01 20:53:39
last_update2019-04-01 20:53:39
depth1
children0
net_rshares0
last_payout2019-04-08 20:53:39
cashout_time1969-12-31 23:59:59
total_payout_value0.000 SBD
curator_payout_value0.000 SBD
pending_payout_value0.000 SBD
promoted0.000 SBD
body_length828
author_reputation38,705,954,145,809
root_title"【Programming】RxJava リアクティブプログラミング vol.2 / RxJavaの概要"
beneficiaries[]
max_accepted_payout1,000,000.000 SBD
percent_steem_dollars10,000
@arcange ·
Congratulations @promari!
Your post was mentioned in the [Steem Hit Parade](https://steemit.com/hit-parade/@arcange/daily-hit-parade-20190401) in the following category:

* Upvotes - Ranked 10 with 1690 upvotes
properties (22)
post_id72,436,968
authorarcange
permlinkre-programming-rxjava-vol-2-rxjava-20190401t175028000z
categorypromari
json_metadata{}
created2019-04-02 15:52:42
last_update2019-04-02 15:52:42
depth1
children0
net_rshares0
last_payout2019-04-09 15:52:42
cashout_time1969-12-31 23:59:59
total_payout_value0.000 SBD
curator_payout_value0.000 SBD
pending_payout_value0.000 SBD
promoted0.000 SBD
body_length211
author_reputation231,443,210,169,699
root_title"【Programming】RxJava リアクティブプログラミング vol.2 / RxJavaの概要"
beneficiaries[]
max_accepted_payout1,000,000.000 SBD
percent_steem_dollars10,000