# RxKotlin
RxKotlin
(opens new window)은 Kotlin에서 ReactiveX를 구현한 라이브러리입니다.
RxKotlin
을 사용하기 위해서는 의존성을 추가해야합니다. 모듈 수준의 build.gradle
에 의존성을 추가합니다.
// 모듈 수준의 build.gradle
dependencies {
// RxKotlin 3
implementation("io.reactivex.rxjava3:rxkotlin:3.x.y")
}
# Observable
Observable
은 이벤트를 발생시키는 주체입니다. 이벤트를 발생시키는 과정을 방출(Emit)
이라고 하며, Observable
은 다음과 같이 생성합니다.
val observable = Observable.create<T> { emitter: ObservableEmitter<T!> ->
// ..
}
제너릭의 타입 파라미터 T
에는 이벤트와 함께 전달할 값의 자료형을 명시합니다. 예를 들어 문자열 타입의 데이터를 함께 전달하는 경우 다음과 같이 작성합니다.
val observable = Observable.create<String> { emitter: ObservableEmitter<String!> ->
// ..
}
ObservableEmitter
클래스에는 이벤트를 방출하기 위한 메소드가 정의되어있습니다. onNext()
메소드를 사용하면 정상적인 이벤트를 방출할 수 있습니다. 또한 onNext()
메소드의 인자로 값을 함께 전달할 수 있습니다.
val observable = Observable.create<String> { emitter: ObservableEmitter<String!> ->
emitter.onNext("Value 1")
}
위 코드는 아래처럼 단축할 수 있습니다.
val observable = Observable.create<String> {
it.onNext("Value 1")
}
값 없이 이벤트만 전달하는 경우 Unit
을 사용합니다.
val observable = Observable.create<Unit> {
it.onNext(Unit)
}
이벤트를 여러 번 방출할 수도 있습니다.
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
}
이벤트 방출을 완료할 때는 onComplete()
를 호출합니다.
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onComplete()
}
onComplete()
를 호출한 후 방출하는 이벤트는 실제로 방출되지 않습니다.
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onComplete()
it.onNext("Value 4") // 방출되지 않음
it.onNext("Value 5") // 방출되지 않음
}
비정상적인 이벤트는 onError()
메소드를 사용하여 방출할 수 있습니다.
val observable = Observable.create<String> {
it.onError(Exception("This is error"))
}
# subscribeBy() / subscribe()
Observable
은 생성만 한다고 이벤트를 방출하지 않습니다. 누군가 Observable
을 구독(Subscribe)
해야만 이벤트를 방출하기 시작합니다.
subscribeBy()
메소드는 Observable
이 방출하는 이벤트를 수신할 때 사용합니다. subscribeBy()
는 onNext
, onError
, onComplete
라는 세 개의 매개변수에 람다식을 전달받습니다.
val observable = Observable.create<String> {
// ...
}
observable
.subscribeBy(onNext = { value ->
}, onError = { error ->
}, onComplete = {
})
각 람다식의 역할은 다음과 같습니다.
onNext
로 전달된 람다식:onNext()
이벤트가 방출되면 호출onError
로 전달된 람다식:onError()
이벤트가 방출되면 호출onComplete
로 전달된 람다식:onComplete()
이벤트가 방출
observable
.subscribeBy(onNext = { value ->
// onNext() 이벤트가 방출되면 호출
}, onError = { error ->
// onError() 이벤트가 방출되면 호출
}, onComplete = {
// onComplete() 이벤트가 방출되면 호출
})
subscribe()
메소드를 사용하면 매개변수의 이름(onNext
, onError
, onComplete
)을 생략할 수도 있습니다.
observable
.subscribe({ value ->
}, { error ->
}, {
})
하나의 Observable
은 여러 곳에서 구독할 수도 있습니다.
val observable = Observable.create<String> {
// ...
}
// 첫 번째 구독
observable
.subscribe({ value ->
}, { error ->
}, {
})
// 두 번째 구독
observable
.subscribe({ value ->
}, { error ->
}, {
})
이제 예제를 살펴봅시다.
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
}
observable
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
다음과 같이 출력됩니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
# Disposable
subscribeBy()
나 subscribe()
는 Disposable
클래스의 인스턴스를 반환합니다.
val disposable: Disposable = observable.subscribe {
// ...
}
이제 아래와 같이 Observable
이 영구적으로 데이터를 발행한다고 가정합시다.
val disposable: Disposable = observable.subscribe {
while(true) {
it.onNext("Value")
}
}
이렇게 되면 Observable
은 메모리에서 사라지지 않고 영구적으로 데이터를 발행하며, 메모리 누수를 초래합니다. 따라서 반드시 적절한 시점에 자원을 해제해야합니다.
Disposable
클래스의 dispose()
메소드를 호출하면 구독이 종료되고 자원이 해제됩니다.
val disposable: Disposable = observable.subscribe {
while(true) {
it.onNext("Value")
}
}
// ...
// 적절한 시점에 호출
disposable.dispose()
Observable
클래스의 doOnDispose()
메소드를 사용하면 dispose()
가 호출될 때 실행될 구문을 등록할 수 있습니다.
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
}
observable
.doOnDispose {
println("onDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
observable.dispose()
결과물은 다음과 같이 출력됩니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
doOnDispose
# CompositeDisposable
안드로이드 예제를 살펴봅시다. 예제에서는 Observable
을 여러 번 구독하고 있습니다.
class MainActivity : AppCompatActivity() {
lateinit var disposable1: Disposable
lateinit var disposable2: Disposable
lateinit var disposable3: Disposable
lateinit var disposable4: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
val observable = Observable.create<String> {
// ...
}
disposable1 = observable.subscribe {
// ..
}
disposable2 = observable.subscribe {
// ..
}
disposable3 = observable.subscribe {
// ..
}
disposable4 = observable.subscribe {
// ..
}
// ...
}
override fun onDestroy() {
super.onDestroy()
disposable1.dispose()
disposable2.dispose()
disposable3.dispose()
disposable4.dispose()
}
}
이때 각각의 Disposable
인스턴스에서 dispose()
를 호출하는 번거로움을 없애기 위해 CompositeDisposable
을 사용할 수 있습니다. Disposable
클래스의 addto()
메소드를 사용하여 CompositeDisposable
인스턴스에 Disposable
을 추가하면 됩니다.
class MainActivity : AppCompatActivity() {
// 추가된 부분
val disposables = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
val observable = Observable.create<String> {
// ..
}
observable.subscribe {
// ..
}.addTo(disposables)
observable.subscribe {
// ..
}.addTo(disposables)
observable.subscribe {
// ..
}.addTo(disposables)
observable.subscribe {
// ..
}.addTo(disposables)
// ...
}
}
이제 Activity.onDestory()
처럼 적절한 위치에서 자원을 해제하면 됩니다.CompositeDisposable
의 clear()
를 호출하면 추가된 모든 Disposable
들이 해제됩니다.
class MainActivity : AppCompatActivity() {
// 추가된 부분
val disposables = CompositeDisposable()
override fun onDestroy() {
super.onDestroy()
// CompositeDisposables에 추가된 모든 Disposable을 해제
disposables.clear()
}
}
# 예제
이제 이벤트에 따른 결과를 확인하기 위해 몇 가지 예제를 살펴봅시다.
# 예제 1
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
// it.onComplete()
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
# 예제 2
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onComplete()
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete
# 예제 3
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onComplete()
it.onNext("Value 4")
it.onNext("Value 5")
it.onNext("Value 6")
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete
# 예제 4
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onError(Exception("This is error"))
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
onError: java.lang.Exception: This is error
# 예제 5
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
// disposable.dispose()
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
# 예제 6
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
disposable.dispose()
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
doOnDispose
# 예제 7
val observable = Observable.create<String> {
it.onNext("Value 1")
it.onNext("Value 2")
it.onNext("Value 3")
it.onComplete()
}
val disposable = observable
.doOnDispose {
println("doOnDispose")
}
.subscribeBy( onNext = { value ->
println("onNext: ${value}")
}, onError ={ error ->
println("onError: ${error}")
}, onComplete = {
println("onComplete")
})
disposable.dispose()
결과는 다음과 같습니다.
onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete
# 스트림
위 예제들에서 살펴본 것 처럼 Observable
은 구독하고 있는 대상에게 데이터를 연속적, 순차적으로 흘려줍니다. 이러한 동작을 프로그래밍 언어에서 데이터 흐름(Flow of data)
또는 스트림(Stream)
이라고 합니다.
# 비동기 프로그래밍
사용자와 지속적으로 상호작용하는 시스템은 시간이 오래 걸리는 작업을 별도의 스레드에서 비동기적으로 처리해야합니다.
RxKotlin
은 비동기 작업을 처리하는데도 효과적입니다. 간단한 예제를 살펴봅시다. 서버에 데이터를 요청하고 있습니다.
val observable = Observable.create<Response> {
// 별도의 스레드에서 실행되어야 함.
// 서버에 데이터 요청
val result = requestData()
if (result.isSuccess) {
it.onNext(response)
} else {
it.onError(ServerException("Network error."))
}
}
observable
.subscribeBy(onNext = { response: String ->
// 매인 스레드에서 실행되어야 함.
handleResponse(response)
}, onError = { error ->
// 매인 스레드에서 실행되어야 함.
handleError(error)
}, onComplete = {
// ..
})
RxKotlin
과 RxAndroid
를 함께 사용하면 멀티스레드를 통한 비동기 처리를 쉽게 구현할 수 있습니다. RxAndroid
가 제공하는 다양한 Schedulers
를 사용하면 코드가 실행되는 스레드를 지정할 수 있습니다.
val observable = Observable.create<Response> {
// 별도의 스레드에서 수행됨.
val result = requestData()
if (result.isSuccess) {
it.onNext(response)
} else {
it.onError(ServerException("Network error."))
}
}
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(onNext = { response: String ->
// 매인 스레드에서 실행됨.
handleResponse(response)
}, onError = { error ->
// 매인 스레드에서 실행됨.
handleError(error)
}, onComplete = {
..
})
RxAndroid
는 다른 포스트에서 자세히 다루겠습니다.