2017년 1월 6일 금요일

RxSwift 소스 분석 - Subject에 대한 이해

RxSwift 소스 분석해보기

1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해

https://github.com/ReactiveX/RxSwift


PublishSubject

PublishSubject는 Observable, SubjectType, Cancelable, ObserverType, SynchronizedUnsubscribeType을 상속한다. 후아, 동작이 복잡하구나. Observable을 통해 observer들을 등록해 주는 기능이 들어가고, ObserverType을 통해 노티를 날려줄 수 있게 되고, Cancelable을 통해 등록된 모든 observer들을 취소할 수 있게 되고, SynchronizedUnsubscribeType을 통해 특정 observer만 취소할 수 있게 된다. PublishSubject가 더이상 동작하지 않게 하는 방법은 completed 또는 error 이벤트를 받거나 dispose에 의해 명시적으로 취소하거나 이다.
내부에서 사용하는 변수들은 다음과 같다.
  • _observers: Bag<AnyObserver<Element>> -> 등록된 observer들을 가지고 있는다. Bag은 간단하게 설명하면 내부에 dictionary를 통해 element들을 가지고 있는다.
  • _stoppedEvent: completed 또는 error가 불리면 이 이벤트를 저장하고 있는다.
  • _isDisposed: 더이상 observer에게 노티를 보내지 않는 경우 true로 설정한다.
  • _stopped: completed 또는 error시 true로 설정하여 이후 next가 발생해도 observer들에게 노티가 가지 않도록 한다. 앞에서 말한 PublishSubject가 더이상 동작하지 않게 하는 두가지 방법에 대한 상태를 위해 _isDisposed와 _stopped가 사용된다.
  • _lock: NSRecursiveLock. 내부 변수를 건드릴 때 사용한다.
observer 등록은 subscribe를 통해 다음과 같이 한다.
public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
  _lock.lock(); defer { _lock.unlock() }
  return _synchronized_subscribe(observer)
}

func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
    // 이미 completed나 error가 불렸는지 확인한다.
    if let stoppedEvent = _stoppedEvent {
      observer.on(stoppedEvent) // 이 observer에게 끝난 subject임을 알린다.
      return Disposables.create()
    }

    // dispose에 의해 취소된 PublishSubject인지 확인한다.
    if _isDisposed {
      observer.on(.error(RxError.disposed(object: self))) // 이 observer에게 끝난 subject임을 알린다.
      return Disposables.create()
    }

    // 노티를 받기위한 observer 리스트에 추가한다.
    let key = _observers.insert(observer.asObserver())
    return SubscriptionDisposable(owner: self, key: key)
}
subscribe는 _synchronized_subscribe에 대해 lock으로 보호하여 호출한다. _synchronized_subscribe는 이미 completed나 error가 불렸으면 이 이전의 이벤트를(_stoppedEvent에 저장되어 있다.) observer에 노티한다. PublishObject가 이미 이전에 dispose에 의해 취소되었으면 error 이벤트를 observer에 노티한다. 이외의 경우라면 observer가 노티를 받을 수 있도록 _observers에 추가한다. 이와 같은 정상적인 경우라면 나중에 자기 자신을 취소할 수 있게 해주는(_observers에서 제거) SubscriptionDisposable을 리턴하고 다른 경우라면 취소 함수(dispose)를 호출해도 아무런 동작이 일어나지 않도록 Disposables.create를 리턴해준다.
모든 observer가 노티를 받는 것의 취소는 dispose를 쓴다.(개별 observer의 취소는 subscribe의 리턴 값에 dispose를 한다.)
public func dispose() {
  _lock.lock(); defer { _lock.unlock() }
  _synchronized_dispose()
}

final func _synchronized_dispose() {
  _isDisposed = true
  _observers.removeAll()
  _stoppedEvent = nil
}
dispose는 _synchronized_dispose에 대해 lock으로 보호하여 호출한다. _synchronized_dispose는 observer에 대한 모든 노티가 취소되었음을 설정한다.
개별 observer의 취소를 위해서는 subscribe에서 리턴된 값에 dispose를 사용한다고 앞에서 말했다. SubscriptionDisposable을 살펴보자.
struct SubscriptionDisposable<T: SynchronizedUnsubscribeType> : Disposable {
  private let _key: T.DisposeKey
  private weak var _owner: T?

  init(owner: T, key: T.DisposeKey) {
    _owner = owner
    _key = key
  }

  func dispose() {
    _owner?.synchronizedUnsubscribe(_key)
  }
}
owner로 PublishSubject를 받고 key로 Bag의 KeyType(_observers에 저장된 observer에 대한 unique identifier이다.)을 받는다. dispose를 호출하면 PublishSubject의 synchronizedUnsubscribe를 호출해준다. PublishSubject의 synchronizedUnsubscribe 구현을 보자.
func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
  _lock.lock(); defer { _lock.unlock() }
  _synchronized_unsubscribe(disposeKey)
}

func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
  _ = _observers.removeKey(disposeKey)
}
synchronizedUnsubscribe는 _synchronized_unsubscribe를 lock으로 보호한다. _synchronized_unsubscribe는 _observers에서 obsesver를 빼준다.
이제 이벤트를 발생시켜 보자. 이것은 ObserverType을 상속함으로서 기능(onNext, onCompleted, onError)이 들어가게 되고 내부적으로 on이 호출되게 된다.
public func on(_ event: Event<Element>) {
  _synchronized_on(event).on(event)
}

func _synchronized_on(_ event: Event<E>) -> Bag<AnyObserver<Element>> {
  _lock.lock(); defer { _lock.unlock() }

  switch event {
  case .next(_):
    // PublishSubject가 종료되었는지를 확인한다.
    if _isDisposed || _stopped {
      return Bag()
    }

    return _observers
  case .completed, .error:
    if _stoppedEvent == nil {
      _stoppedEvent = event
      _stopped = true
      let observers = _observers // Bag이 struct이므로 카피가 일어난다.
      _observers.removeAll()
      return observers
    }

    return Bag()
  }
}
_synchronized_on을 lock으로 보호한 후 이벤트를 처리한다.(근데 왜 이전의 코드와 lock의 사용법이 다를까? 리턴값에 바로 on을 적용하기 위해서이다.) 그런데, 리턴값의 타입이 Bag인데 어떻게 on을 호출할 수 있는 것일까? Bag+Rx.swift를 보면 Bag의 element의 타입이 ObserverType인 경우에 대해 extension으로 내부의 element들(observer)에 on을 호출해주는 on이 정의되어 있다.

BehaviorSubject

PublishSubject와 거의 유사하기 때문에 다른 점 기준으로 보자. BehaviorSubject는 마지막으로 발생한 이벤트를 새로 subscribe하는 observer에 전달해준다. 따라서 마지막 이벤트를 _value에 가지고 있는다. 생성하고 바로 subscribe하는 경우에도 이벤트가 발생하도록 하기 위해 init에 초기 값을 설정해준다.
public init(value: Element) {
  _value = value
}
그리고 value의 현재 값을 제공하는 get 함수를 제공한다. _value에 대한 접근을 lock으로 보호하고 에러의 경우 throw를 던진다.
public func value() throws -> Element {
  _lock.lock(); defer { _lock.unlock() } // {
    // subject가 취소되었는지 확인한다.
    if _isDisposed {
      throw RxError.disposed(object: self)
    }

    // .error 이벤트가 발생했는지 확인한다.
    if let error = _stoppedEvent?.error {
      // intentionally throw exception
      throw error
    }
    else {
      return _value
    }
  //}
}
_value의 업데이트는 on(.next) 이벤트 발생시 해주고 subscribe시 _value를 새로 등록되는 observer에게 발생시킨다.
func _synchronized_on(_ event: Event<E>) -> Bag<AnyObserver<Element>> {
  _lock.lock(); defer { _lock.unlock() }
  if _stoppedEvent != nil || _isDisposed {
    return Bag()
  }

  switch event {
  case .next(let value):
    _value = value // _value를 최신 이벤트로 유지한다.
  case .error, .completed:
    _stoppedEvent = event
  }

  return _observers
}

func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
  if _isDisposed {
    observer.on(.error(RxError.disposed(object: self)))
    return Disposables.create()
  }

  if let stoppedEvent = _stoppedEvent {
    observer.on(stoppedEvent)
    return Disposables.create()
  }

  let key = _observers.insert(observer.asObserver())
  observer.on(.next(_value)) // 마지막에 발생했던 이벤트를 이 observer에 발생시킨다.

  return SubscriptionDisposable(owner: self, key: key)
}

ReplaySubject

ReplaySubject에 Replay를 생성하기 위한 create/createUnbounded 함수가 있다. bufferSize에 따라 ReplayOne 또는 ReplayMany를 생성하거나 bound없이 생성하려면 ReplayAll을 생성한다.
public static func create(bufferSize: Int) -> ReplaySubject<Element> {
  if bufferSize == 1 {
    return ReplayOne()
  }
  else {
    return ReplayMany(bufferSize: bufferSize)
  }
}

public static func createUnbounded() -> ReplaySubject<Element> {
  return ReplayAll()
}
Replay는 세 종류가 있는데 이들의 상속관계는 다음과 같다.
  • ReplayOne -> ReplayBufferBase -> ReplaySubject
  • ReplayMany -> ReplayManyBase -> ReplayBufferBase -> ReplaySubject
  • ReplayAll -> ReplayManyBase -> ReplayBufferBase -> ReplaySubject
ReplayBufferBase에서 Replay의 동작을 위한 trim, addValueToBuffer, replayBuffer라는 세 인터페이스를 정의한다. ReplayBufferBase를 상속하는 클래스는 이 세 함수를 구현해야 한다. 그리고 subscribe시 replayBuffer를 호출해주고, 이벤트 발생시는 on(.next)시는 addValueToBuffer와 trim을 on(.error)나 on(.completed)에는 trim만들 호출해준다.
func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
  if _isDisposed {
    observer.on(.error(RxError.disposed(object: self)))
    return Disposables.create()
  }

  let AnyObserver = observer.asObserver()

  replayBuffer(AnyObserver)
  if let stoppedEvent = _stoppedEvent {
    observer.on(stoppedEvent)
    return Disposables.create()
  }
  else {
    let key = _observers.insert(AnyObserver)
    return SubscriptionDisposable(owner: self, key: key)
  }
}

func _synchronized_on(_ event: Event<E>) -> Bag<AnyObserver<Element>> {
  _lock.lock(); defer { _lock.unlock() }
  if _isDisposed {
    return Bag()
  }

  if _stoppedEvent != nil {
    return Bag()
  }

  switch event {
  case .next(let value):
    addValueToBuffer(value)
    trim()
    return _observers
  case .error, .completed:
    _stoppedEvent = event
    trim()
    let observers = _observers
    _observers.removeAll()
    return observers
  }
}
ReplayOne은 addValueToBuffer가 호출되면 이때의 값을 _value에 저장해 놓는다. 그리고, 나중에 subscribe에 의해 replayBuffer가 호출되면 이때의 observer에 저장해둔 _value를 보낸다. trim에서는 아무짓도 하지 않는다.
override func trim() {

}

override func addValueToBuffer(_ value: Element) {
  _value = value
}

override func replayBuffer(_ observer: AnyObserver<Element>) {
  if let value = _value {
    observer.on(.next(value))
  }
}
ReplayManyBase는 queue를 가지고 addValueToBuffer의 경우 value를 queue에 넣고 replayBuffer의 경우 queue에 있는 값들을 observer에게 보낸다. 그리고 queue를 초기화 할 때 queue가 지정한 크기보다 1개 더 클 수 있으므로 사용자가 지정한 크기에서 (+1)을 하여 초기 queue의 크기를 정한다.(이벤트 발생시 addValueToBuffer후 trim을 하기 때문)
init(queueSize: Int) {
  _queue = Queue(capacity: queueSize + 1)
}

override func addValueToBuffer(_ value: Element) {
  _queue.enqueue(value)
}

override func replayBuffer(_ observer: AnyObserver<E>) {
  for item in _queue {
    observer.on(.next(item))
  }
}
ReplayMany는 ReplayManyBase를 상속하므로 trim만을 구현한다. queue의 크기가 지정한 크기보다 클경우 오래된 값을 삭제한다.
override func trim() {
  while _queue.count > _bufferSize {
    _ = _queue.dequeue()
  }
}
ReplayAll도 ReplayManyBase를 상속하므로 trim만을 구현한다. trim에서 아무짓도 하지 않음으로서 queue가 무제한 늘어날 수 있도록 한다.
override func trim() {

}

Variable

초기화시 BehaviorSubject를 가지고 _value의 값을 변경할 때 BehaviorSubject를 통해 이벤트를 발생시킨다.
public init(_ value: Element) {
  _value = value
  _subject = BehaviorSubject(value: value)
}

public var value: E {
 get {
    _lock.lock(); defer { _lock.unlock() }
    return _value
  }
  set(newValue) {
    _lock.lock()
    _value = newValue
    _lock.unlock()

    _subject.on(.next(newValue)) // 이벤트를 발생시킨다.
  }
}
Variable에 observer를 등록할 수 있도록 하기 위해 실제로는 BehaviorSubject를 리턴하는 asObservable 함수를 제공한다.
public func asObservable() -> Observable<E> {
  return _subject
}

댓글 없음:

댓글 쓰기

Building asynchronous views in SwiftUI 정리

Handling loading states within SwiftUI views self loading views View model 사용하기 Combine을 사용한 AnyPublisher Making SwiftUI views refreshable r...