Combine(3) - Scheduler

Combine Scheduler

오늘은 Combine의 Scheduler를 정리해보려고 합니다. 운영체제에서 Scheduler의 개념은 실행의 순서를 정하는 역할로 많이 소개가 되어있습니다. Combine에서도 Scheduler의 역할은 비슷하다고 할 수 있습니다.

Scheduler란?

Scheduler는 Publisher의 작업들이 언제 그리고 어떻게 수행될지를 결정하기 위한 객체입니다. 작업들을 적절한 시간에 수행이 되어야 하고 또 어 떤 Thread에서 수행되어야 하는지를 제어할 수 있습니다.

비동기 처리에 있어서 적절한 시기에 스레드 사용을 하는 것은 필수적입니다. 그래서 Apple에서는 GCD(Grand Central Dispatch)를 제공하고 있습니다.

DispatchQueue.global().async {
    //Code
}

DispatchQueue.main.async {
    //Code
}

이 밖에도 GCD를 추상화 시켜서 wrapping 시켜서 제공하는 OpearionQueue 도 존재합니다.

Apple에서 제공하는 Scheduler 타입객체는 DispatchQueue, OperationQueue, RunLoop등으로 이것들을 Combine에서도 그대로 사용가능합니다.

Combine Scheduler 의 종류

Combine에는 Schduler를 switching 하여 명시적으로 지정할 수 있는 방법이 두가지 있습니다.

receive(on:)subscribe(on:) 두가지 방식으로 지정할 수 있습니다.

image

image

receive(on:) 의 정의를 잠깐 보면 매개변수로 Scheduler 프로토콜 타입을 받고 있습니다. 이 Scheduler 프로토콜을 iOS13이상 부터는 DispatchQueue, OperationQueue, RunLoop, ImmediateSchdeuler들이 채택하고 있기 때문에 매개변수로서 사용할 수 있습니다.

receive(on:) 으로 제어

publisher로 부터 element를 수신할 scheduler를 지정하는 역할을 합니다. downstream의 실행 컨텍스트를 변경하는 역할을 합니다.

아래의 예제를 우선 보겠습니다.

let publisher = ["Gaki"].publisher

publisher
    .map { _ in print(Thread.isMainThread) }    // true
    .receive(on: DispatchQueue.global())
    .map { print(Thread.isMainThread) }         // false
    .sink { print(Thread.isMainThread) }        // false

우선 Scheduler의 경우 element가 최초로 생성된 스레드와 동일한 스레드를 사용하게 됩니다. 처음 element가 생성된 스레드 가 MainThread에서 생성되었기 때문에 true가 나오게 됩니다. 그 후 receive 를 통해 DispatchQueue.global()로 변경한 후에는 백그라운드 스레드에서 동작하기 때문에 false가 나오는 것을 확인 할 수 있습니다.

subscribe(on:) 으로 제어

receive와 반대로 upstream의 실행 컨텍스트를 변경합니다. 즉 element의 소스를 구독하는 시점에서 실행 컨텍스를 변경 할 수 있습니다.

아래의 예제의 경우 MainThread에서 실행되기 때문에 True가 나오게 됩니다. 초기의 Upstream의 실행컨텍스트가 MainThread에서 아무런 변경사항을 주지 않았기 때문입니다.

let publisher = CurrentValueSubject<String, Never>("Gaki")

publisher
    .sink { _ in print("Sink \(Thread.isMainThread)") } // Sink true

위 예제를 아래와 같이 변경 시켜 보겠습니다.

let publisher = CurrentValueSubject<String, Never>("Gaki")

publisher
    .subscribe(on: DispatchQueue.global())
    .sink { _ in print("Sink \(Thread.isMainThread)") } // Sink false

위에서 추가된 구문은 DispatchQueue.global()subscribe(on:) 에 추가를 했습니다. 그러면 구독 자체가 DispatchQueue.global() 에서 일어나게 되고, 즉 upstream을 백그라운드에서 실행하게 됩니다. 그런 다음 output downstream이 동일한 queue에 전달이 되어 sink에서 false를 리턴하게 됩니다.

이 모든 전제는 위에서도 언급한 Scheduler는 element가 생성된 스레드와 동일한 스레드를 사용 하기 때문입니다.

다른 예제를 통해 조금더 알아 보겠습니다.

func printValue(_ value: String) {
    Just(value)
        .handleEvents(receiveOutput: { _ in print(Thread.isMainThread) }) // 1
        .receive(on: DispatchQueue.global())
        .handleEvents(receiveOutput: { _ in print(Thread.isMainThread) }) // 2
        .receive(on: DispatchQueue.main)
        .handleEvents(receiveOutput: { _ in print(Thread.isMainThread) }) // 3
        .receive(on: DispatchQueue.global())
        .handleEvents(receiveOutput: { _ in print(Thread.isMainThread) }) // 4
        .subscribe(on: DispatchQueue.main)
        .handleEvents(receiveOutput: { _ in print(Thread.isMainThread) }) // 5
        .sink { value in
            print(value) // 출력
    }
}
DispatchQueue.global().async {
    printValue("Gaki")
}
/*
true
false
true
false
false
Gaki
*/

handleEvent를 통해 이벤트를 제어하는 구간을 5가지로 나누었습니다.

우선 printValue 함수를 gloabal(), 즉 백그라운드 스레드에서 실행을 시켰는데 첫번째 출력이 MainThread에서 실행이 되었다고 true가 반환이 되었습니다.

이 경우 subscribe(on:) 에 의해 이렇게 출력이 되었다고 볼 수 있습니다. “왜? subscribe는 4번 이후에 호출이 되었는데?”라고 생각할 수 있습니다. 하지만 이는 호출 시점이 상관이 없이 호출이 되면 우선적으로 upstream에 영향을 주게 됩니다.

그렇게 때문에 main에서 실행이 되고 그다음 receive(on:) 에서 (1) global -> (2) main -> (3) global 이 되고 4번 이후 subscribe에서 main으로 컨텍스트를 변경 시켰지만 이는 다시 main queue로 돌아오지 않기 때문에 여전히 global 에 의해 false를 출력하게 됩니다. 그리고 최종적으로 5번에서도 main이 아니기 때문에 false를 출력합니다.

적절하게 사용하는 것이 중요

receive(on:) 의 경우 Publisher의 특정 작업의 스케쥴러를 변경할 수 있기 때문에 여러번 사용하여 downstream을 제어하게 됩니다. 하지만 subscribe(on:) 의 경우 upstream의 스케쥴러를 바꿔버리기 때문에 한번만 사용을 하여서 쓰레드를 관리하는 것이 좋습니다.



© 2020. by Gaki

Powered by gaki