본문 바로가기

RxJava

[RxJava3] Reactive 연산자 - 1.생성 연산자, 변환 연산자

ReactiveX 연산자는 Observable을 생성도 하고, 변환도 하며, 흐름제어를 하는 기능도 있는 함수 입니다.

 

Rx 연산자를 사용할 줄 안다는 것은 Observable 데이터 흐름을 다룰 줄 안다는 것이고 즉, Reactive 프로그래밍을 하고 있다는 것이라고도 볼 수 있겠네요. (다음 포스팅에서 스케줄러 등 중요한 개념들을 더 다루어야하기는 하지만)

 

다양한 연산자들이 있지만, 공식문서에서 그룹핑한 내용을 보면

Creating Observables, Transforming Observables, Filtering Observables, Combining Observables 등이 있습니다.

 

하나씩 천천히 알아봅시다.

 

1. Creating (생성) 연산자

이전에 다루었던 create(), just(), fromXX() 등의 함수도 여기 생성 연산자에 포함됩니다.

대표적인 연산자 interval(), range(), repeat(), differ() 순서로 더 알아봅시다.

 

  • interval()  : 일정한 시간 간격 (period, 단위: unit)  으로 0 부터 1씩 증가하는 Long 객체를 발행

interval

 Observable.interval(1L, TimeUnit.SECONDS)
 	.subscribe {
    	println("$it : ${System.currentTimeMillis()}")
    }
Thread.sleep(3000) 

코드에서 Thread 에 sleep을 준 이유는 아래와 같이 interval 함수가 COMPUTATION 스레드에서 실행되기 때문에 메인에서 스레드의 계산 완료를 기다려줘야하기 때문입니다. sleep 코드를 없애면 값이 출력되기 전에 종료되는 것을 확인할 수 있습니다.

 

@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, @NonNull TimeUnit unit)

실제로 위의 코드 결과도 기다려준 시간만큼만 출력됨을 알 수 있습니다. 별다른 로직이 었다면 interval 함수는 무한히 수행이 됩니다.

0 : 1582805953403
1 : 1582805954404
2 : 1582805955404

 

  • range() : start 값부터 count 개의 Int 객체를 발행합니다.

range

Observable.range(3,5)
    .subscribe {
        println("$it : ${System.currentTimeMillis()}")
    }

이번 코드에서 Thread에 sleep이 없어도 정상동작이 되는 이유는 아래와 같이 range 함수가 메인스레드에서 동작을하기 때문입니다.

@SchedulerSupport(SchedulerSupport.NONE)
public static Observable<Integer> range(int start, int count)

실행 결과도 시간 지체 없이 발행되는 것을 확인할 수 있습니다.

3 : 1582806541921
4 : 1582806541921
5 : 1582806541921
6 : 1582806541921
7 : 1582806541921

 

  • repeat() : 반복 발행 합니다

repeat

 

 

  • defer() : Observer가 subscribe() 하기 전까지 Observable을 생성하지 않으며, 각 Observer에게 새로운 Observable을 생성해줌.

이런 함수는 왜? 언제 필요할까?

Observable의 생성이 구독전까지 미루어지기 때문에, 묵은 데이터가 아닌 "최신의 데이터"를 얻어야 할 때에 사용할 수 있을 것입니다. 무슨 말인지 알아봅시다.

 

참고로, defer 함수의 경우 Rxjava2 에서 일부 변경 된것을 확인할 수 있다. 다음과 같이 RxJava3에서 앞으로도 여러 변화들을 확인할 수 있을 것 같다.

defer_RxJava3
defer_Rxjava2

 

// RxJava2 의 defer 함수가 사용: java 의 Callable을 직접 , Exception (extends Throwable) 을 던짐
package java.util.concurrent.Callable;
public interface Callable<V> {
    V call() throws Exception;
}


// RxJava3 의 defer 함수가 사용: reactivex패키지의 인터페이스를 사용, Throwable 을 던짐
package io.reactivex.rxjava3.functions;
public interface Supplier<@NonNull T> {
    T get() throws Throwable;
}

 

다시, 본론으로 돌아와서

 

 


 

 

2. Transforming(변환) 연산자

map 계열의 함수가 여기에 해당합니다.  ex. map(), flatMap(), concatMap(), switchMap() 

combineLatest(), buffer(), scan(), 그리고 groupBy() 도 사용하면서 알아봅시다.

 

  • map : Observable에서 발생시킨 데이터에 함수를 적용하여 변환한다.

ReactiveX 공식 사이트의 마블 다이어그램을 보면서 이해해봅시다. (http://reactivex.io/documentation/operators/map.html)

 

map

 

map 함수에 어떤 값을 넣었더니 * 10 된 결과로 반환을 해줍니다.  이때 눈여겨 보아야 할 부분은 "함수를 적용한다" 는 것입니다.

Observable에서 발생된 값은 map() 정의되어 있는 함수 ( X => 10 * X  ) 에 따라 값이 변환되어 반환됩니다.

 

    ( * 함수형 프로그래밍의 특징을 map 함수에서도 확인을 할 수 있는데요, 함수가 입력도 되고 출력도 될 수 있습니다.)

 

아래와 같이 mapper 함수를 정의하고, map 함수에  mapper 를 넣어줍니다. 실행해보면, 입력으로 들어온 Int 값에 " A"를 붙인 String 값이 반환됨을 확인할 수 있습니다.

val mapper = Function<Int, String> {
    "$it A"
}

Observable.fromArray(1, 2, 3)
    .map(mapper)
    .subscribe {
        println(it)
    }
1 A
2 A
3 A

 

참고로, null 을 반환하는 mapper 함수를 넣으면 런타임 에러가 발생합니다. 

( io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException , map 함수가 @NonNull  이기 때문입니다.)

     

 

  • flatMap() : map 계열의 함수로, 데이터를 입력 받아 Observable을 발행합니다. 즉, 하나의 데이터가 입력되었을 때 2개 이상의 데이터를 발행할 수 있다는 말입니다.

flatMap도 마블 다이어그램을 보면서 이해해봅시다. (http://reactivex.io/documentation/operators/flatmap.html)

flatMap

마블 다이어그램의 주황색 공과 연두색 공을 처리하는 단계에서 볼 수 있듯이 순서를 보장하지는 않는 특징이 있습니다.

 

 val mapper = Function<Int, Observable<String>> {
     Observable.just("$it A", "$it B")
 }

Observable.fromArray(1, 2, 3)
    .flatMap(mapper)
    .subscribe {
        println($it)
    }
1 A
1 B
2 A
2 B
3 A
3 B

동일한 결과를 아래와 같이도 구현할 수 있다. 코드를 잘 음미해보면 좋을 것 같다.

val mapper = Function<Int, Observable<String>> {
    Observable.just("A", "B")
}

val combiner = BiFunction<Int, String, String> { t: Int, u: String ->
    "$t $u"
}

Observable.fromArray(1,2,3)
    .flatMap(mapper, combiner)
    .subscribe {
        println(it)
    }

 

 

  • concatMap() : flatMap과 유사하지만, 들어온 순서대로 데이터 처리가 됨. (순서 보장)

concatMap

 

  • switchMap : concatMap과 유사하지만, 순서를 보장하기 위해 기존 작업을 바로 중단하는 특징이 있습니다. (마지막 입력으로 들어온 데이터에 대해서는 확실한 처리를 보장하겠네요. )

 

  • buffer() : N 개씩 번들로 묶어서 방출함.

buffer

 

아래 코드를 실행시켜보면, 3개씩 리스트에 묶여서 출력되는 것을 확인해볼 수 있다.

Observable.fromArray(1,2,3,4,5,6,7)
    .buffer(3)
    .subscribe {
        println(it)
    }
[1, 2, 3]
[4, 5, 6]
[7]

 

  • scan() : Observable이 방출하는 데이터를 변환한 중간 결과와 최종결과를 발행합니다.

scan

마블 다이어그램 (http://reactivex.io/documentation/operators/scan.html 참고하여 그림) 을 코드로 옮겨보았습니다.

실행 결과와 풀이를 통해서 2개의 입력을 받아서 함수를 수행하는데, 그 중간결과와 최종결과가 발행되는 것을 확인할 수 있습니다.

Observable.fromArray(1,2,3,4,5)
    .scan { t1,t2 ->
        t1 + t2}
    .subscribe {
        println(it)
    }
실행 결과 풀이

3
6
10
15
1
1+2
(1+2) + 3
(1+2+3) + 4
(1+2+3+4) + 5

 

  • groupBy : Observable을 기준 (keySelector)에 따라 Observable 그룹으로 방행합니다.
Observable.fromArray(1,2,3,4,5)
    .groupBy {
        if(it % 2 == 0)  {
            "짝수"
        } else {
            "홀수"
        }
    }
    .subscribe { group ->
        group.subscribe { item ->
            println("${group.key}: $item")
        }
    }
홀수: 1
짝수: 2
홀수: 3
짝수: 4
홀수: 5

 


이번 포스팅에서는 데이터 흐름을 변환하는 연산자(operator)  중에서도 생성/변환 연산자에 대해서 다루어 보았습니다.

추가로, 몇가지 마블 다이어그램을 보았는데요, Reactive 프로그래밍이 비동기를 지향 + 시간의 흐름이 중요한 요소이기 때문에 코드를 작성해 보실때에도 마블 다이어그램을 연상하면서 만들면 도움이 될 것입니다.

 

 

참고

http://reactivex.io/documentation/operators.html

'RxJava' 카테고리의 다른 글

[RxJava3] ReactiveX 란?  (0) 2020.03.06
[RxJava3] 스케줄러  (0) 2020.03.06
[RxJava3] Reactive 연산자 - 2.필터 연산자, 결합 연산자  (0) 2020.03.01
[RxJava3] Observable  (0) 2020.02.26