非同期処理する際にRxJavaを使うと便利でした

近況

4月に新卒で入社したaです。
今回からブログを担当していきます。よろしくお願いします。

といっても今はまだ東京に住んでいます。
今週末に京都に引っ越す予定です。関西に住んだことがないので楽しみです。


Free photos from Anita Peeples

最近はRailsを使って、IoT的な社内業務で使うためのシンプルなWebページとスマホアプリを作成していました。
RailsのデプロイにAWS(Amazon Web Service)を使ったのですが、使うのが初めてだったので結構ハマりました。
手助けしていただき、今は無事動作しています。

AndroidアプリをKotlinで書いたのですが、非同期処理する際にストリームライブラリであるRxJavaを使うと便利だったので紹介します。

RxJavaとは

RxJavaは、Javaでリアクティブプログラミングを行うためのライブラリです。
リアクティブプログラミングがなんなのか、私も厳密に理解しているわけではないのですが、いろいろな操作をストリーム(データの流れ)として扱うことができるものと認識しています。モバイルアプリでストリームとして扱えるものの例としては、リスト操作やUIイベント(ボタンのクリック等)があります。また、ネットワークやハードウェアから受け取るイベントもストリームと見做せます。

使用例

今回のアプリでは、以下のような処理手順が必要でした。

  1. 1. ユーザーからの入力イベント
  2. 2. APIからデータを取得
  3. 3. 端末のセンサからデータを取得
  4. 4. センサデータとAPIから取得したデータをUIに反映

Androidでは、画面の描画を妨げないためネットワークアクセス等の時間のかかる処理はUIスレッド以外で非同期で行う必要があります。また、2と3の処理を同時ではなく、2のデータ取得を行なった後に3のデータ取得を行いたいというようなケースがよくあります。非同期処理をコールバックを使って書くとネストが深くなってしまう問題があります。

RxJavaを使うとこの問題が解消されます。以下は
非同期処理A->非同期処理B->非同期処理C
の順で処理する例です。
また、RxJavaはAndroidに対応しているため、Androidのスレッドの切り替えもきれいに書けます。


// 非同期処理A
private fun loadDataA(): Observable<Hoge> {
    return Observable
            .create<Hoge> { e ->
                // ネットワークやセンサからデータが取得されるたびにonNextでデータを送る
                e.onNext(Hoge())

                // データ取得が完了したら呼ぶ
                e.onComplete()
            }
            .subscribeOn(Schedulers.newThread()) // 新しいスレッド
            .observeOn(AndroidSchedulers.mainThread()) // UIスレッド
            .doOnNext { hoge ->
                // UIに反映する処理などを書く
            }
            .doOnError { t ->
                // エラー時の処理を書く
            }
            .doOnComplete { }
}

// 非同期処理B
private fun loadDataB(): Observable<Huga> {
    return Observable
            .create<Huga> { e ->
                // ネットワークやセンサからデータが取得されるたびにonNextでデータを送る
                e.onNext(Huga())

                // データ取得が完了したら呼ぶ
                e.onComplete()
            }
            .subscribeOn(Schedulers.newThread()) // 新しいスレッド
            .observeOn(AndroidSchedulers.mainThread()) // UIスレッド
            .doOnNext { huga ->
                // UIに反映する処理などを書く
            }
            .doOnError { t ->
                // エラー時の処理を書く
            }
            .doOnComplete { }
}

// 非同期処理C
private fun loadDataC(): Observable<Hugo> {
    return Observable
            .create<Hugo> { e ->
                // ネットワークやセンサからデータが取得されるたびにonNextでデータを送る
                e.onNext(Hugo())
            }
            .subscribeOn(Schedulers.newThread()) // 新しいスレッド
            .observeOn(AndroidSchedulers.mainThread()) // UIスレッド
            .doOnNext { hugo ->
                // UIに反映する処理などを書く
            }
            .doOnError { t ->
                // エラー時の処理を書く
            }
}

各非同期処理について、Observableを返すメソッドを作ります。
注意しなければならないのは、subscribe()メソッドを呼ばない限り、Observableの処理は実行されないことです。

コールバック地獄を回避するために、3つのストリームを直列に繋ぎます。
flatMapを使います。
flatMapでできたストリームをsubscribe()することで、順番に非同期処理が実行されます。


val loadDataAObservable = loadDataA()
val loadDataBObservable = loadDataB()
val loadDataCObservable = loadDataC()

// 直列化
loadDataAObservable
        .flatMap { loadDataBObservable }
        .flatMap { loadDataCObservable }
        .subscribe()

関連記事

最近の記事 おすすめ記事
  1. 新人さん向けの品質についての読書会

  1. 社内発表:チームワークって何?

  2. プログラマーの新入社員がMMJに入社して感じたこと

  3. Deep learning勉強筆記11

カテゴリー

アーカイブ

検索


TOP
TOP