계발자 블로그
[RxJava] Observable 본문
http://reactivex.io/documentation/observable.html
ReactiveX - Observable
Observable In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits. This pattern facilitates concurrent operations because it does not need to block while waiting for the Ob
reactivex.io
Observable
RxJava에서는 Observable을 구독(subscribe)하는 Obsever가 존재하고, Observable이 순차적으로 발행하는
데이터에 대해서 반응합니다. Observable은 밑에 3가지 이벤트를 사용하여 동작합니다.
1. OnNext() : 하나의 소스 Observable에서 Observer까지 한번에 하나씩 순차적으로 데이터를 발행합니다
2. onComplete() : 데이터 발행이 끝났음을 알리는 함수, onNext() 호출이 발생하지 않음을 나타냅니다.
3. onError() : 오류가 발생했음을 Observer에 전달합니다.
이 세가지 함수들은 Emitter라는 인터페이스에 선언됩니다. 데이터 및 오류 내용을 발행 할 때 null을 발행할 수 없습니다
Observable 생성하기
RxJava에서는 연산자(Operator)라고 부르는 여러 정적 메소드를 통해 기존 데이터를 참조하거나 변형하여
Observable을 생성할 수 있습니다.

위와 같이 상당히 많이 있습니다,,,
여기서 Observable을 생성하는 연산자 중 하나인 just() 연산자를 사용해 보겠습니다.
just() 연산자는 해당 item을 그대로 발행하는 Observable을 생성해 줍니다.
연산자의 인자로 넣은 item을 차례로 발행하며, 한개 또는 타입이 같은 여러개의 item을 넣을 수도 있습니다.
Observable.<String> source = Observable.just("Hello", "World");
source.subscribe(System.out::println);
실행결과
Hello
World
just()라는 creating Observable 연산자를 사용하여 Observable을 생성한 후,
subscribe() 함수를 이용하여 생성한 Observable을 observer가 구독한 것입니다.
subscribe() 연산자의 인자로 들어간 println 이라는 함수는 onNext 함수로 사용된 겁니다
즉, observer는 just() 연산자로 생성된 Observable을 관찰하고 있고, Observable은 item을 방출하면서
자신의 observer의 함수인 onNext를 호출하는 것입니다.
onNext 함수가 호출된 결과로 println 함수가 호출되어 실행 결과가 위와 같이 나왔습니다.
Cold Observable과 Hot Observable
Observable을 구현하는 방식에서 Observable과 Observer 사이에 동작 차이가 있습니다
이를 Cold Observable, Hot Observable이라고 부릅니다.
Cold Observable은 just() 연산자처럼 Observable에 구독(subscribe)을 요청하면 item을 발행하기 시작합니다.
Cold Observable 예제코드
Observable scr = Observable.interval(1, TimeUnit.SECONDS);
src.subscribe(value -> System.out.println("a : " + value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("b : " + value));
Thread.sleep(3000);
실행결과
a : 0
a : 1
a : 2
a : 3
b : 0
a : 4
b : 1
a : 5
b : 2
Hot Observable은 item 발행이 시작된 이후로 모든 구독자에게 동시에 같은 item을 발행합니다.
ConnectableObservable은 HotObservable을 구현할 수 있도록 도와주는 타입입니다.
아무 Observable 타입이나 publish 연산자를 이용하여 간단히 ConnectableObservable로 변환할 수 있습니다.
ConnectableObservable은 구독을 요청해도 Observable은 데이터를 발행하지 않습니다.
connect() 연산자를 호출할 때 item을 발행하기 시작합니다.
Hot Observable 예제코드
ConnectableObservable src = Observable.interval(1, TimeUnit.SECONDS).publish();
src.connect();
src.subscribe(value -> System.out.println("a : " + value));
Thread.sleep(3000);
src.subscribe(value -> System.out.println("b : " + value));
Thread.sleep(3000);
실행결과
a : 0
a : 1
a : 2
a : 3
b : 3
a : 4
b : 4
a : 5
b : 5
이렇게 Observable과 간단한 예제에 대해 알아봤습니다
'Java > RxJava' 카테고리의 다른 글
| [RxJava] Subject (0) | 2021.11.29 |
|---|---|
| [RxJava] Scheduler (0) | 2021.11.29 |
| [RxJava] 다양한 연산자들 (0) | 2021.11.26 |
| [RxJava] RxJava란? (0) | 2021.11.26 |