본문 바로가기

RxJava

[RxJava3] Reactive 연산자 - 2.필터 연산자, 결합 연산자

이번 포스팅에서는 이전 생성, 변환 연산자에 이어서

필터 연산자, 결합연산자, 그리고 그 외 자주 사용하게 될 연산자들에 대해서 알아보고자 합니다.

 

1. Filter(필터) 연산자

Observable에서 아이템을 선택적으로 발행하는 연사자입니다.

대표적인 연산자에는 filter(), first(), 그리고 debounce(), sample() 이 있습니다. 

 

  • filter() : 조건이 true가 되는 데이터만 발행.

직관적인 함수명대로 predicate 가 true가 되는 조건의 아이템에 대해서만 발행을 하는 함수입니다.

@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(@NonNull Predicate<? super T> predicate)

아래는 짝수만 걸러서 발행하는 코드로, it % 2 == 0 이 true가 되는 값 2와 4 만 발행되어 구독한 값을 확인할 수 있습니다.

Observable.fromArray(1,2,3,4,5)
    .filter{
        it % 2 == 0
    }
    .subscribe {
        println(it)
    }

 

  • first() : 조건이 true가 되는 데이터만 발행.

자동완성 결과에서 볼 수 있듯이, first() 는 입력으로 받은 데이터들 중에서 첫번 째 데이터를 Single 로 반환한다.

first

아래 코드를 실행시켜보면, [1,2,3,4,5] 중 첫번째 값인 1이 출력되는 것을 확인할 수 있다. 

val single = Observable.fromArray(1,2,3,4,5)
    .first(-1) // 데이터가 없다면, -1이 발행됨

single.subscribe { result ->
    println(result)
}

first()와 비슷한 함수에는

마지막 값만 갖는 last(), 최초 N개의 값만 갖는 take(N), 마지막 N개의 값만 갖는 takeLast(N), 최초 N개 값을 건너뛰는 skip(N), 마지막 N개 값을 건너뒤는 skipLast(N) 등이 있다.

 

필터 연산자에는 debounce() 와 sample() 등의 함수도 있는데, 이후 Backpresure 관련 연산자 포스팅에서 다뤄보겠습니다.

 


 

2. Combining(결합) 연산자

결합 연산자는 여러개의 Observable을 조합하여 하나의 Observable을 생성하는 연산자 입니다.

대표적인 함수에는 merge(), switch(), zip(), 그리고 combineLatest() 가 있습니다.

 

  • merge() : 여러 Observable을 하나의 Observable로 결합하는데, 이때 먼저 입력되는 데이터를 그대로 발행.

이때, 하나의 업스트림에서 error 발생 시, merge() 도 onError() 이벤트를 발생하며 종료가 됩니다.

따라서, onError() 이벤트를 미루고 싶다면, mergeDelayError() 함수를 사용하면 됩니다.

 

val arr1 = arrayListOf(1,2,3)
val arr2 = arrayListOf("A", "B", "C")

val dataSource1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
    .map {
        arr1[it.toInt()]
    }
    .take(arr1.size.toLong())

val dataSource2 = Observable.interval(50L, 50L, TimeUnit.MILLISECONDS)
    .map {
        arr2[it.toInt()]
    }
    .take(arr2.size.toLong())

Observable.merge(dataSource1, dataSource2)
    .subscribe {
        print("$it ")
    }

Thread.sleep(500) // Computation 스레드에서 동작하는 interval()

 

interval 은 Computation 스레드에서 동작하는 함숟다. 따라서 코드 상 같은 시간에 발행 될 2와 "B"는 실행할 때마다

2 B 또는 B 2 로  그때 그때 다르게 출력되는 되는 것을 확인할 수 있다.

 

  • zip() : combiner 2개 이상의 Observable에서 데이터를 받아 결합을 수행하며,  zip() 이 onComplete될 때 까지 source Observable 들은 무한 대기를 합니다.

아래 코드로 구현한 zip 의 함수 정의는 다음과 같이 Observable을 2와 zipper BiFunction 함수를 인자로 받아서 동작합니다. 

public static <T1, T2, R> Observable<R> zip(
@NonNull ObservableSource<? extends T1> source1, 
@NonNull ObservableSource<? extends T2> source2,
@NonNull BiFunction<? super T1, ? super T2, ? extends R> zipper) 

1이  먼저 발행되고, 그 다음 "A"가 발행되면, zipper 정의에 따라 1A가 zip되어 발행됩니다.

2B도 동일하게 동작하며, 마지막으로 "C"가 발행되고 dataSource2는 대기를 하며, 3 발행 후 3C로 zip 되어 발행되고 onComplete()되면

dataSource1과 dataSource2도 onComplte() 됩니다.

 

만약,  arr2의 ["A", "B", "C", "D"] 로 arr1 과 개수가 맞지 않아 무시되며 "D" 가 발행된 후 다같이 onComplete() 됩니다.

val zipper = BiFunction { t1: Int, t2: String ->
    "$t1$t2"
}

val arr1 = arrayListOf(1,2,3)
val arr2 = arrayListOf("A", "B", "C")
val dataSource1 =  Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
    .map {
        arr1[it.toInt()]
    }
    .take(arr1.size.toLong())
    
val dataSource2 =  Observable.interval(50L, 50L, TimeUnit.MILLISECONDS)
     .map {
         arr2[it.toInt()]
     }
     .take(arr2.size.toLong())

Observable.zip(dataSource1, dataSource2, zipper)
    .subscribe {
        println(it)
    }

Thread.sleep(500)
1A
2B
3C

마블다이어그램으로 그려본다면, 아래와 같습니다.

zip

 

 

  • combineLatest() : zip() 과 비슷하지만, source Observable의 데이터가 변경되면 자동 갱신해주는 기능이 추가된 함수입니다.

zip() 에서 수행한 코드에서

1) zip()을 combineLatest() 로 변경하고,

2) val arr2 = arrayListOf("A", "B", "C", "D") 로 변경한 후 코드를 실행하면 어떤 결과가 나올까요? 상상해볼까요 :)

 

아래와 같이 source 데이터가 발행될 때마다 combineLatest() 가 갱신  후 발행하는 것을 확인할 수 있습니다.

1A
2A
2B
2C
2D
3D 

  또는

 


1A
1B
2B
2C
2D
3D

 

 

3. Aggregate(집계) 연산자

 item의 전체 sequence에서 동작하는 연산자

 

  • reduce() : 2개 이상의 Observable을 interleaving 없이 이어서 발행.

이전에 배운 중간 결과와 최종결과를 모두 발행을 하는 scan() 함수와 달리, reduce()는 최종 결과 하나를 집계하여 발행하는 함수입니다.

 Observable.fromArray(1,2,3,4,5)
     .reduce { t1: Int, t2: Int ->
         t1 + t2
     }
     .subscribe {
         println(it)
     }
15

 

  • concat() : 2개 이상의 Observable을 interleaving 없이 이어서 발행.

concat()은 직전 Observable이 onComlete() 이벤트 발생 후, 그 다음 Observable을 구독하는 특징이 있습니다.

concat 매개변수로 넣은 Observable 순서대로 발행을 수행합니다.

 

즉, dataSource2의 발행이 1000 ms 딜레이로 가장 늦지만, 언제 발행되었는지 여부와 관계없이 concat에 등록된 순서대로 발행이 되는 것을 결과 출력에서 확인할 수 있습니다.

val arr1 = arrayListOf(1,2,3)
val arr2 = arrayListOf("a", "b")
val arr3 = arrayListOf(true, true, false)

val dataSource1 =  Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
    .map {
        arr1[it.toInt()]
    }
    .take(arr1.size.toLong())
val dataSource2 =  Observable.interval(1000L, 50L, TimeUnit.MILLISECONDS)
    .map {
        arr2[it.toInt()]
    }
    .take(arr2.size.toLong())
val dataSource3 =  Observable.interval(50L, 30L, TimeUnit.MILLISECONDS)
    .map {
        arr3[it.toInt()]
    }
    .take(arr1.size.toLong())

Observable.concat(dataSource1, dataSource2, dataSource3)
    .subscribe {
        println(it)
    }

Thread.sleep(1500)
1
2
3
a
b
true
true
false

이번 포스팅에서는 필터 연산자와 결합 연산자에 대해서 배웠습니다.

다음 포스팅은 연산자의 마지막 글로  Backpressure와 관련된 연산자들에 대해서 알아보겠습니다 :)

 

 

 

참고

http://reactivex.io/documentation/operators.html

'RxJava' 카테고리의 다른 글

[RxJava3] ReactiveX 란?  (0) 2020.03.06
[RxJava3] 스케줄러  (0) 2020.03.06
[RxJava3] Reactive 연산자 - 1.생성 연산자, 변환 연산자  (0) 2020.02.27
[RxJava3] Observable  (0) 2020.02.26