-
[RxJava] RxJava (2) - ObservableANDROID/RXJAVA 2022. 4. 1. 20:02
출처 - https://12bme.tistory.com/570
[RxJava] RxJava 프로그래밍(1) - 리액티브 프로그래밍
서버 다수와 통신하게 되면 API 호출 각각에 콜백을 추가하게 된다. 콜백이 늘어나면 애플리케이션의 복잡성도 증가(callback hell)하게 된다. RxJava는 자바로 리액티브 프로그래밍을 할 수 있는 라이
12bme.tistory.com
마블 다이어그램 보는 법
마블 다이어그램은 RxJava를 이해하는 핵심 도구이다. map(), flatMap() 함수 등의 수많은 리액티브 연산자들을 이해하는데 큰 도움을 준다. 마블 다이어그램은 예를 보면서 어떻게 활용하는지 배우는 것이 가장 좋다.

1. 위에 있는 실선은 Observable의 시간 표시줄(timeline)이다. 시간순으로 데이터가 발행되는 것을 표현한다.

timteline 2. Observable에서 발행하는 데이터이다. 시간 순서대로 별 삼각형, 오각형, 원등의 동형을 발행한다. 데이터를 발행할 때는 OnNext알림이 발생한다.

data 3. 파이프는 Observable에서 데이터 발행을 완료했다는 의미이다. 한번 완료하면 이후에는 더이상 데이터를 발생할 수 없다. 완료하면 onComplete 알림이 발생한다.

pipe 4. 아래로 내려오는 점선 화살표는 가각 함수의 입력과 출력데이터이다. 가운데 박스는 함수를 의미한다. flip() 함수는 입력값을 뒤집는 함수이다. 따라서 입력값의 색상은 그대로 두고 모양을 위아래 180도 회전하여 뒤집는다.

input, output 5. 함수의 결과가 출력된 표시시간줄이다.

result timeline 6. 엑스(X)는 함수가 입력값을 처리할 때 발생하는 에러를 의미한다. 에러 발생 시에는 OnError 알림이 발생한다.
(정상적 종료는 파이프 에러는 엑스)

error 조금 더 복잡한 마블 다이어그램
아래는 RxJava의 combineLatest() 함수의 마블 다이어그램으로 2개 이상의 Observable을 처리할 수 있다. 이전 flip 함수 마블 다이어그램과 다른 점은 Observable의 시간 표시줄이 1개가 아니라 2개로 늘었다는 점이다.

combineLatest marble diagram 1. 첫번째 Observable은 같은 모양(원)이지만 색깔이 다른 도형을 발행한다.
2. 두번째 Observable은 모양은 다르지만 번호가 없는 도형을 발행한다.
3. combineLatest() 함수는 첫번째 Observable의 도형과 두번째 Observable의 동형이 모두 들어오면 둘을 합성한다.
4. 가장 아래 시간 표시줄은 combineLatest() 함수의 실행 결과로 자세히 살펴보면 두 Observable의 결과를 조합한 것임을 알 수 있다. 첫번째 Observable의 결과를 조합한 것임을 알 수 있다. 첫번째 Observable에서는 색상을 취하고 두번째 Observable에서는 도형의 모형을 취하고 있다.
RxJava는 리액티브 프로그래밍이라는 새로운 시각을 제공해주고 비동기 프로그래밍과 함수형 프로그래밍을 모두 활용해 문제를 해결할 수 있따. RxJava는 마블 다이어그램을 배운다고해도 과언이 아니다.
RxJava 요소 - Observable
Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 한다. RxJava 프로그래밍은 Observable에서 시작해 Observable로 끝난다고해도 과언이 아닐 정도로 중요한 개념이다. Observable 클래스와 그의 파생 클래스에 대해 알아보도록 한다.
1. Observable 클래스
Observable은 옵저버(Observer) 패턴을 구현한다. 옵저버 패턴은 객체의 상태변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다. 그리고 상태 변화가 잇을때마다 메서드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려준다. 라이프 사이클은 존재하지 않으며 보통 단일 함수를 통해 변화만 알린다.
Observable은 간단한게 Observer가 관찰하는 대상이라고 할 수 있다. "Observed가 관찰을 통해서 얻은 결과를 의미한다면 Observable은 현재 관찰되지 않았지만 이론을 통해서 앞으로 관찰할 가능성을 의미한다" 사용자가 버튼을 누르면 버튼에 미리 등록해 둔 onClick() 메서드를 호출해 원하는 동작을 처리하는 것도 옵저버 패턴의 대표적인 예이다.
RxJava의 Observable은 세가지의 알림을 구독자에게 전달한다.
- onNext : Observable이 데이터의 발행을 알린다. 기존 옵저버 패턴과 같다.
- onComplete : 모든 데이터의 발행을 완료했음을 알린다. onComplete 이벤트는 단 한번만 발생하며, 발생한 후에는 더이상 onNext 이벤트가 발생해서는 안된다.
- onError : Observable에서 어떤 이유로 에러가 발생했음을 알린다. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉, Observable의 실행을 종료한다.
Observable 클래스에는 Observable을 생성하는 팩토리 함수, 중간 결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다. 따라서 많은 수의 함수가 존재한다. Observable을 생성할때는 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출한다. 다양한 함수가 있으며 다음 표처럼 구분할 수 있다.
팩토리 함수 함수 RxJava1 기본 팩토리 함수 create(), just(), from() RxJava2 추가 팩토리 함수
(from() 함수 세분화)fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher() 기타 팩토리 함수 interval(), range(), timer(), defer() 등 1) just() 함수
데이터를 발생하는 가장 쉬운 방법은 기존의 자료구조를 사용하는 것이다. just() 함수는 인자로 넣은 데이터를 차례로 발생하려고 Observable을 생성한다(실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다). 한 개의 값을 넣을 수도 있고 인자로 여러 개의 값(최대 10개)을 넣을 수도 있다. 단 타입은 모두 같아야 한다.

just 중앙 원은 Observable에서 발생하는 데이터로 just() 함수를 거치면 입력한 원을 그대로 발행한다. 파이프는 모든 데이터 발행이 완료되었음을 의미한다.

2개 이상의 인자를 받는 just just()는 인자의 데이터를 한개씩 발행한다. (이때 데이터 내용을 변경하지 않고 그대로 발행한다). 모두 발행한 이후에는 완료한다.
class Example { fun emit() { Observable.just(1, 2, 3, 4, 5, 6) .subscribe(System.out::println) } ... }2) subscribe() 함수와 Disposable 객체
RxJava는 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할 수 있다. 이때 사용하는 것이 subscribe() 함수이다. Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.
RxJava는 선언형 프로그래밍을 지향한다. 선언형 프로그래밍은 명령형 프로그래밍의 반대말로 어떤 방법으로 동작하는지가 아니라 프로그래밍할 대상이 무엇인지 알려주는 것을 의미한다. 예를 들어 명령형 프로그래밍 언어에서는 실행할 알고리즘과 동작을 구체적으로 명시한다. 하지만 선언형 프로그래밍은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않는다.
subscribe() 함수의 주요 원형은 아래와 같다.
Disposable subscribe() Disposable subscribe(Consumer<? super T> onNext) Disposable subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError) Disposable subscribe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError, Action onComplete)원형 각각은 다음과 같은 의미가 있다.
- 인자가 없는 subscribe() 함수는 onNext와 onCompletable 이벤트를 무시하고 onError 이벤트가 발생했을때만 onErrorNotImplementedException을 던진다(throw). 따라서 Observable로 작성한 코드를 테스트하거나 디버깅할 때 활용한다.
- 인자가 1개 있는 subscribe() 함수는 onNext 이벤트를 처리한다. 정상처리가 되면 onNext 이벤트만 실행하고, onError 이벤트가 발생하면 onErrorNotImplemented을 던진다.
- 인자가 2개인 subscribe() 함수는 onNext와 onError 이벤트를 처리
- 인자가 3개인 함수는 onNext, onError, onComplete 이벤트를 모두 처리한다.
앞 함수 원형은 모두 Disposable 인터페이스의 객체를 리턴한다. Disposable은 RxJava1의 subscribe 객체에 해당한다. 아래 2개 메소드만 있다.
void dispose() boolean isDisposed()Disposable 인터페이스 함수
dispose()는 Observable에게 더이상 데이터를 발행하지 않도록 구독을 해지하는 함수이다. Observable contract에 따르면 Observable이 onComplete 알림을 보냈을때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊는다.
따라서 onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()를 호출할 필요가 없다. isDisposed() 함수는 이름에서 알 수 있는 것처럼 Observable이 데이터를 발행하지 않는지(구독을 해지했는지) 확인하는 함수이다.
val mObservable : Observable<String> = Observable.just("HARDY", "LIAM", "MOYA") val mDisposable = mObservable.subscribe({ name -> // onNext 처리 ... }, { error -> // onError ... }, { // onComplete ... })3) create() 함수
just() 함수는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만 create() 함수는 onNext, onComplete, onError와 같은 알림을 개발자가 직접 호출해야 한다. 그래서 create()는 라이브러리가 무언가를 해준다기보다 개발자가 무언가를 직접 하는 느낌이 강한 함수이다.

create 구독자에게 데이터를 발행하려면 onNext() 함수를 호출해야 하며 모든 데이터를 발행한 후에는 반드시 onComplete() 함수를 호출해야 한다. create() 함수의 원형은 아래와 같다.
Observabe<T> create(ObservabeOnSubscribe<T> source)그리고 ObservableOnSubscribe 인터페이스는 함수 1개만 포함하고 있다.
public interface ObservableOnSubscribe<T> { void subscribe(ObservableEmitter<T> e) throws Exception; }create() 함수를 활용해 데이터를 발행하는 방법이다.
val mObservable : Observable<Int> = Observable.create { emitter -> emitter.onNext(100) emitter.onNext(200) emitter.onNext(300) emiiter.onComplete() } mObservable.subsribe(System.out::println)람다 표현식 활용
참고로 create() 함수의 인자는 원래 ObservableOnSubscribe 인터페이스 타입이어야 한다. 위에서 ObservableEmitter 인터페이스 객체를 인자로 받는 람다 표현식을 처리했다. 람다 표현식을 활용하면 Observable.create()를 호출할때 불필요한 익명 객체나 멤버 변수를 기재하지 않고 꼭 필요한 변수만 소스 코드에 작성하면 되므로 소스코드의 가독성이 높아진다.
Observable.create()만 사용하고 subcribe()를 호출하지 않은 예제이다.
val mObservable : Observable<Int> = Observable.create { emitter -> emitter.onNext(100) emitter.onNext(200) emitter.onNext(300) emitter.onComplete() }실행해보면 아무것도 출력되지 않는다. 이유는 subscribe()를 호출하지 않았기 때문이다.
아래와 같이 subscribe를 사용하면 잘 출력이 된다.
val mObservable : Observable<Int> = Observable.create { emitter -> emitter.onNext(100) emitter.onNext(200) emitter.onNext(300) emitter.onComplete() } mObservable.subscribe { data -> println("Result : $data") }System.out::println과 같은 형태를 자바 8에서는 메서드 레퍼런스라고 한다. 리액티브 프로그래밍에서 앞서 설명한 람다 표현식과 메서드 레퍼런스를 적극적으로 사용하는 것이 좋다. 또한 람다 표현식과 메서드 레퍼런스를 사용할때 다음 우선순위를 고려해서 사용 여부를 판한하기를 권한다.
1. 메서드 레퍼런스로 축약할 수 있는지 확인.
2. 그 다음 람다 표현식을 활용할 수 있는지 확인
3. 1~2를 활용할 수 없으면 익명 객체나 멤버 변수로 표현
람다 표현식의 장점을 살펴본다. data -> System.out.println("Result : $data")라는 람다 표현식을 익명 객체로 변경하면 아래와 같다.
val source: Observable<Int> = Observable.create { emitter -> emitter.onNext(100) emitter.onNext(200) emitter.onNext(300) emitter.onComplete() } source.subscribe(object : Consumer<Int> { override fun accept(data: Int) { println("Result : $data") } })위 코드의 경우 subscirbe()의 원형을 알아야 하고 Consumber<T> 클래스의 메서드도 매번 입력을 해주어야 하므로 번거롭다. 하지만 람다 표현식은 익명 객체의 메서드 원형을 override로 기술하지 않아도 되므로 가독성이 높다. 프로그래머가 매번 메서드의 원형을 기억할 필요 없이 자바 컴파일러가 추론하도록 해준다.
Observable.create() 를 사용할때는 주의해야 한다.
RxJava 문서에 따르면 create()는 RxJava에 익숙한 사용자만 활용하도록 권고한다. create()를 사용하지 않고 다른 팩토리 함수를 사용하면 같은 효과를 낼 수 있기 때문이다. 만약 그래도 사용해야 한다면 아래 사항을 확인해야 한다.
1. Observable이 구독 해지(dispose)되었을때 등록된 콜백을 모두 해제해야 한다. 그렇지 않으면 잠재적으로 메모리 누수가 발생한다.
2. 구독자가 구독하는 동안에만 onNext와 onComplete 이벤트를 호출해야 한다.
3. 에러가 발생했을때는 오직 onError 이벤트로만 에러를 전달해야 한다.
4. 배압(back pressure)을 직접 처리해야 한다. (배압은 Observable에서 데이터를 발행하는 속도와 구독자가 처리하는 속도의 차이가 클 때 발생한다. RxJava2 부터는 Flowable이라는 특화 클래스로 배압을 처리한다)
4) fromArray() 함수
just()나 create()는 단일 데이터를 다룬다. 단일 데이터가 아닐떄는 fromXXX() 계열 함수를 사용한다. 원래 RxJava1에서는 from과 fromCallable 함수만 사용했다. 그런데 from() 함수를 배열, 반복자, 비동기 계산 등에 모두 사용하다 보니 모호함이 있었다. 따라서 RxJava2에서는 from() 함수를 세분화했고 그중 하나가 지금 소개하는 fromArray() 함수이다.
배열에 들어있는 데이터를 처리할 때는 fromArray() 함수를 사용한다.
val arr : Array<Int> = arrayOf(100, 200, 300) val source : Observalbe<Int> = Observable.fromArray(arr) source.subscribe(System.out::println)arr 배열에 원하는 값을 담고 Observable.fromArray()를 호출했다. 그 다음 subscribe() 함수를 호출하면 데이터를 차례로 발행하게 된다.
Map 객체에 관한 Observable 클래스의 from() 함수는 없을까?
RxJava에는 List나 Set 객체의 from() 함수는 존재하는데 Map에 관한 from() 함수는 없다. Map 인터페이스는 배열도 dkslrh Interable<E> 인터페이스를 구현하지 않았으므로 from() 계열 함수는 존재하지 않는다. Map 인터페이스는 키-값 쌍으로 구성되었으므로 keySet() 함수의 순서를 정하면 만들 수 있을 것이다.
5) fromCallable() 함수
RxJava는 비동기 프로그래밍을 하기 위한 라이브러리이다. 이전까지 기본적인 자료구조로 Observable을 생성하는 부분을 살펴봤다면 이번에는 기존 자바에서 제공하는 비동기 클래스나 인터페이스와의 연동을 살펴본다. 먼저 살펴보는 것은 자바5에서 추가된 동시성 API인 Callable 인터페이스이다. 비동기 실행 후 결과를 반환하는 call() 메서드를 정의한다.
callable은 run() 메서드가 있는 Runnable 인터페이스처럼 메서드가 하나고, 인자가 없다는 점에서 비슷하지만 실행결과를 return 한다는 점에서 차이가 있다. 또한 Executor 인터페이스의 인자로 활용되기 때문에 잠재적으로 다른 스레드에서 실행되는 것을 의미하기도 한다.
val callable: Callable<String> = Callable { Thread.sleep(1000) return@Callable "hardy" } val source = Observable.fromCallable(callable) source.subscribe(System.out::println)6) fromFuture() 함수
Future 인터페이스 역시 자바5에서 추가된 동시성 API로 비동기 계산의 결과를 구할때 사용한다. 보통 Executor 인터페이스를 구현한 클래스에서 Callable 객체를 인자로 넣어 Future를 반환한다. get() 메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올때까지 블로킹된다.
val future: Future<String> = Executors.newSingleThreadExecutor().submit(Callable<String> { Thread.sleep(1000) return@Callable "hardy" }) val source = Observable.fromFuture(future) source.subscribe(System.out::println)Executors 클래스는 단일 스레드 실행자 ( SingleThreadExecutor) 뿐만 아니라 다양한 스레드풀 (FixedThreadPool, CachedThreadPool)을 지원한다. 하지만 RxJava는 위와 같은 실행자를 활용하기 보다 RxJava에서 제공하는 스케줄러를 활용하도록 권장한다.
'ANDROID > RXJAVA' 카테고리의 다른 글
[RxJava] RxJava에 대한 이해 (0) 2022.12.21 [RxJava] RxJava (3) - Single 과 Maybe (0) 2022.05.03 [RxJava] RxJava (1) - 시작 (0) 2022.03.30