# RxKotlin

RxKotlin (opens new window)은 Kotlin에서 ReactiveX를 구현한 라이브러리입니다.

RxKotlin을 사용하기 위해서는 의존성을 추가해야합니다. 모듈 수준의 build.gradle에 의존성을 추가합니다.

// 모듈 수준의 build.gradle

dependencies {
    // RxKotlin 3
    implementation("io.reactivex.rxjava3:rxkotlin:3.x.y")
}

# Observable

Observable은 이벤트를 발생시키는 주체입니다. 이벤트를 발생시키는 과정을 방출(Emit)이라고 하며, Observable은 다음과 같이 생성합니다.

val observable = Observable.create<T> { emitter: ObservableEmitter<T!> ->
    // ..
}

제너릭의 타입 파라미터 T에는 이벤트와 함께 전달할 값의 자료형을 명시합니다. 예를 들어 문자열 타입의 데이터를 함께 전달하는 경우 다음과 같이 작성합니다.

val observable = Observable.create<String> { emitter: ObservableEmitter<String!> ->
    // ..
}

ObservableEmitter클래스에는 이벤트를 방출하기 위한 메소드가 정의되어있습니다. onNext() 메소드를 사용하면 정상적인 이벤트를 방출할 수 있습니다. 또한 onNext() 메소드의 인자로 값을 함께 전달할 수 있습니다.

val observable = Observable.create<String> { emitter: ObservableEmitter<String!> ->
    emitter.onNext("Value 1")
}

위 코드는 아래처럼 단축할 수 있습니다.

val observable = Observable.create<String> { 
    it.onNext("Value 1")
}

값 없이 이벤트만 전달하는 경우 Unit을 사용합니다.

val observable = Observable.create<Unit> { 
    it.onNext(Unit)
}

이벤트를 여러 번 방출할 수도 있습니다.

val observable = Observable.create<String> { 
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
}

이벤트 방출을 완료할 때는 onComplete()를 호출합니다.

val observable = Observable.create<String> { 
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onComplete()
}

onComplete()를 호출한 후 방출하는 이벤트는 실제로 방출되지 않습니다.

val observable = Observable.create<String> { 
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onComplete()
    it.onNext("Value 4")    // 방출되지 않음
    it.onNext("Value 5")    // 방출되지 않음
}

비정상적인 이벤트는 onError() 메소드를 사용하여 방출할 수 있습니다.

val observable = Observable.create<String> { 
    it.onError(Exception("This is error"))
}

# subscribeBy() / subscribe()

Observable은 생성만 한다고 이벤트를 방출하지 않습니다. 누군가 Observable구독(Subscribe)해야만 이벤트를 방출하기 시작합니다.

subscribeBy()메소드는 Observable이 방출하는 이벤트를 수신할 때 사용합니다. subscribeBy()onNext, onError, onComplete라는 세 개의 매개변수에 람다식을 전달받습니다.

val observable = Observable.create<String> { 
    // ...
}

observable
    .subscribeBy(onNext = { value ->
        
    }, onError = { error ->

    }, onComplete = {
        
    })

각 람다식의 역할은 다음과 같습니다.

  • onNext로 전달된 람다식: onNext()이벤트가 방출되면 호출
  • onError로 전달된 람다식: onError()이벤트가 방출되면 호출
  • onComplete로 전달된 람다식: onComplete() 이벤트가 방출
observable
    .subscribeBy(onNext = { value ->
        // onNext() 이벤트가 방출되면 호출
    }, onError = { error ->
        // onError() 이벤트가 방출되면 호출
    }, onComplete = {
        // onComplete() 이벤트가 방출되면 호출
    })

subscribe()메소드를 사용하면 매개변수의 이름(onNext, onError, onComplete)을 생략할 수도 있습니다.

observable
    .subscribe({ value ->
        
    }, { error ->

    }, {
        
    })

하나의 Observable은 여러 곳에서 구독할 수도 있습니다.

val observable = Observable.create<String> { 
    // ...
}

// 첫 번째 구독
observable
    .subscribe({ value ->
        
    }, { error ->

    }, {
        
    })

// 두 번째 구독
observable
    .subscribe({ value ->
        
    }, { error ->

    }, {
        
    })

이제 예제를 살펴봅시다.

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
}

observable
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

다음과 같이 출력됩니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3

# Disposable

subscribeBy()subscribe()Disposable클래스의 인스턴스를 반환합니다.

val disposable: Disposable = observable.subscribe {
    // ...
}

이제 아래와 같이 Observable이 영구적으로 데이터를 발행한다고 가정합시다.

val disposable: Disposable = observable.subscribe {
    while(true) {
        it.onNext("Value")
    }
}

이렇게 되면 Observable은 메모리에서 사라지지 않고 영구적으로 데이터를 발행하며, 메모리 누수를 초래합니다. 따라서 반드시 적절한 시점에 자원을 해제해야합니다.

Disposable클래스의 dispose()메소드를 호출하면 구독이 종료되고 자원이 해제됩니다.

val disposable: Disposable = observable.subscribe {
    while(true) {
        it.onNext("Value")
    }
}
// ...

// 적절한 시점에 호출
disposable.dispose()

Observable클래스의 doOnDispose()메소드를 사용하면 dispose()가 호출될 때 실행될 구문을 등록할 수 있습니다.

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
}

observable
    .doOnDispose {
        println("onDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

observable.dispose()

결과물은 다음과 같이 출력됩니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
doOnDispose

# CompositeDisposable

안드로이드 예제를 살펴봅시다. 예제에서는 Observable을 여러 번 구독하고 있습니다.

class MainActivity : AppCompatActivity() {

    lateinit var disposable1: Disposable
    lateinit var disposable2: Disposable
    lateinit var disposable3: Disposable
    lateinit var disposable4: Disposable

    override fun onCreate(savedInstanceState: Bundle?) {

        val observable = Observable.create<String> {
            // ...
        }

        disposable1 = observable.subscribe {
            // ..
        }

        disposable2 = observable.subscribe {
            // ..
        }

        disposable3 = observable.subscribe {
            // ..
        }

        disposable4 = observable.subscribe {
            // ..
        }

        // ...
    }

    override fun onDestroy() {
        super.onDestroy()
        disposable1.dispose()
        disposable2.dispose()
        disposable3.dispose()
        disposable4.dispose()
    }
}

이때 각각의 Disposable 인스턴스에서 dispose()를 호출하는 번거로움을 없애기 위해 CompositeDisposable을 사용할 수 있습니다. Disposable클래스의 addto()메소드를 사용하여 CompositeDisposable 인스턴스에 Disposable을 추가하면 됩니다.

class MainActivity : AppCompatActivity() {

    // 추가된 부분
    val disposables = CompositeDisposable()

    override fun onCreate(savedInstanceState: Bundle?) {

        val observable = Observable.create<String> {
            // ..
        }

        observable.subscribe {
            // ..
        }.addTo(disposables)

        observable.subscribe {
            // ..
        }.addTo(disposables)

        observable.subscribe {
            // ..
        }.addTo(disposables)

        observable.subscribe {
            // ..
        }.addTo(disposables)

        // ...
    }
}

이제 Activity.onDestory()처럼 적절한 위치에서 자원을 해제하면 됩니다.CompositeDisposableclear()를 호출하면 추가된 모든 Disposable들이 해제됩니다.

class MainActivity : AppCompatActivity() {

    // 추가된 부분
    val disposables = CompositeDisposable()

    override fun onDestroy() {
        super.onDestroy()
        // CompositeDisposables에 추가된 모든 Disposable을 해제
        disposables.clear()
    }
}

# 예제

이제 이벤트에 따른 결과를 확인하기 위해 몇 가지 예제를 살펴봅시다.

# 예제 1

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    // it.onComplete()
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3

# 예제 2

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onComplete()
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete

# 예제 3

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onComplete()
    it.onNext("Value 4")
    it.onNext("Value 5")
    it.onNext("Value 6")
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete

# 예제 4

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onError(Exception("This is error"))
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
onError: java.lang.Exception: This is error

# 예제 5

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

// disposable.dispose()

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3

# 예제 6

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

disposable.dispose()

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
doOnDispose

# 예제 7

val observable = Observable.create<String> {
    it.onNext("Value 1")
    it.onNext("Value 2")
    it.onNext("Value 3")
    it.onComplete()
}

val disposable = observable
    .doOnDispose {
        println("doOnDispose")
    }
    .subscribeBy( onNext = { value ->
        println("onNext: ${value}")
    }, onError ={ error ->
        println("onError: ${error}")
    }, onComplete = {
        println("onComplete")
    })

disposable.dispose()

결과는 다음과 같습니다.

onNext: Value 1
onNext: Value 2
onNext: Value 3
onComplete

# 스트림

위 예제들에서 살펴본 것 처럼 Observable은 구독하고 있는 대상에게 데이터를 연속적, 순차적으로 흘려줍니다. 이러한 동작을 프로그래밍 언어에서 데이터 흐름(Flow of data) 또는 스트림(Stream)이라고 합니다.

# 비동기 프로그래밍

사용자와 지속적으로 상호작용하는 시스템은 시간이 오래 걸리는 작업을 별도의 스레드에서 비동기적으로 처리해야합니다.

RxKotlin은 비동기 작업을 처리하는데도 효과적입니다. 간단한 예제를 살펴봅시다. 서버에 데이터를 요청하고 있습니다.

val observable = Observable.create<Response> {
    // 별도의 스레드에서 실행되어야 함.
    
    // 서버에 데이터 요청
    val result = requestData()

    if (result.isSuccess) {
        it.onNext(response)
    } else  {
        it.onError(ServerException("Network error."))
    }
}

observable
    .subscribeBy(onNext = { response: String ->
        // 매인 스레드에서 실행되어야 함.
        handleResponse(response)
    }, onError = { error ->
        // 매인 스레드에서 실행되어야 함.
        handleError(error)
    }, onComplete = {
        // ..
    })

RxKotlinRxAndroid를 함께 사용하면 멀티스레드를 통한 비동기 처리를 쉽게 구현할 수 있습니다. RxAndroid가 제공하는 다양한 Schedulers를 사용하면 코드가 실행되는 스레드를 지정할 수 있습니다.

val observable = Observable.create<Response> {
    // 별도의 스레드에서 수행됨.
    
    val result = requestData()

    if (result.isSuccess) {
        it.onNext(response)
    } else  {
        it.onError(ServerException("Network error."))
    }
}

observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeBy(onNext = { response: String ->
        // 매인 스레드에서 실행됨.
        handleResponse(response)
    }, onError = { error ->
        // 매인 스레드에서 실행됨.
        handleError(error)
    }, onComplete = {
        ..
    })

RxAndroid는 다른 포스트에서 자세히 다루겠습니다.