2016년 12월 28일 수요일

RxSwift 소스 분석 - Scheduler에 대한 이해

RxSwift 소스 분석해보기

1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해

https://github.com/ReactiveX/RxSwift


Scheduler

observer callback이 실행되는 쓰레드는 observerOn으로 subscription과 unsubscription logic이 실행되는 쓰레드는 subscribeOn으로 지정할 수 있다.
'Observable+Concurrency.swift' 파일에서 extension으로 ObservableType에 지정한다.
extension ObservableType {
  public func observeOn(_ scheduler: ImmediateSchedulerType)

  public func subscribeOn(_ scheduler: ImmediateSchedulerType)
}
두 함수 모두 ImmediateSchedulerType을 파라메타로 받는다.
public protocol ImmediateSchedulerType {
  func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable
}
observeOn의 경우를 먼저 살펴보자.
scheduler가 SerialDispatchQueueScheduler인 경우에는 ObserveOnSerialDispatchQueue를 아닌 경우에는 ObserveOn을 사용한다
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<E> {
  if let scheduler = scheduler as? SerialDispatchQueueScheduler {
      return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
  }
  else {
      return ObserveOn(source: self.asObservable(), scheduler: scheduler)
    }
  }
ObserveOnSerialDispatchQueue를 보면 앞에서 살펴본 AnonymousObservable과 거의 유사한 구조를 가지고 있음을 알 수 있다. 차이점은 원래의 observable에다가 subscribe를 실행시키면서 ObserveOnSerialDispatchQueueSink를 파라메타로 넘겨주는 것이다. 이렇게 함으로서 원래의 observable의 subscribe handler가 실행될 때 observeer로 ObserveOnSerialDispatchQueueSink의 onCore함수가 호출되게 된다. 아래에서 보다시피 onCore에서 scheduler.schedule를 실행함으로서 scheduler에서 지정한 쓰레드에서 action이 실행되게 된다. ObserveOnSerialDispatchQueueSink의 init을 보면 cachedScheduleLambda는 사용자가 지정해준 observer를 실행하도록 되어 있는 closure이다.
init(scheduler: SerialDispatchQueueScheduler, observer: O, cancel: Cancelable) {
  self.scheduler = scheduler
  self.observer = observer
  self.cancel = cancel
  super.init()

  cachedScheduleLambda = { sink, event in
    sink.observer.on(event)

    // 코드 생략...

    return Disposables.create()
  }
}

override func onCore(_ event: Event<E>) {
    let _ = self.scheduler.schedule((self, event), action: cachedScheduleLambda)
  }
MainScheduler를 통해 schedule가 호출될 때 어떻게 동작하는지를 보자.
초기화시 _mainQueue를 DispatchQueue.main로 설정한다. schedule 호출시 호출되는 scheduleInternal를 override하여 _mainQueue.async안에서 action을 실행한다.
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
  // 자세한 코드 생략...

  _mainQueue.async {
    if !cancel.isDisposed {
      _ = action(state)
    }

    _ = AtomicDecrement(&self.numberEnqueued)
  }
}

댓글 없음:

댓글 쓰기

안드로이드 아키텍쳐 패턴을 알아보자

- Clean architecture에 대한 설명 Architecting Android...Reloaded 위 블로그 내용에 대한 Github 소스 https://github.com/android10/Android-CleanArchitect...