1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해
https://github.com/ReactiveX/RxSwift
가장 기본이 되는 구조는 이벤트를 발생시키는 Observable이 있고 이것을 받아보는 Observer, 그리고 Observer가 이벤트를 받는 것을 취소할 수 있게 해주는 Disposable이 있다. 이를 코드로 구현하면 아래와 같이 된다.
public protocol ObservableType { associatedtype E func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E } public protocol ObserverType { associatedtype E func on(_ event: Event<E>) } public enum Event<Element> { case next(Element) case error(Swift.Error) case completed } public protocol Disposable { func dispose() }
위 protocol들의 구현체로 다음의 클래스들을 만든다. ObservableType <-- Observable <-- Producer / ObserverType <-- ObserverBase / Disposable <-- Cancelable
public class Observable<Element> : ObservableType { public typealias E = Element public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E { abstractMethod() // subclass에서 구현을 해서 사용하도록 한다. } } class Producer<Element> : Observable<Element> { override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E { // 자세한 구현 생략... let disposer = SinkDisposer() let sinkAndSubscription = run(observer, cancel: disposer) ... return disposer } } class ObserverBase<ElementType> : Disposable, ObserverType { func on(_ event: Event<E>) { switch event { case .next: if _isStopped == 0 { onCore(event) } case .error, .completed: if !AtomicCompareAndSwap(0, 1, &_isStopped) { return } onCore(event) } } // ObserverBase를 상속받는 클래스는 이 함수를 구현해서 event handler를 호출하도록 한다. func onCore(_ event: Event<E>) { abstractMethod() } } public protocol Cancelable : Disposable { // Was resource disposed. var isDisposed: Bool { get } }
Producer의 subscribe에서 ObserverType을 받아서 이를 내부에 연결시킨다. 나중에 이벤트가 발생하면 ObserverBase.on이 호출된다.
이제 이 클래스들을 기반으로 하여 원하는 기능을 구현하고 있는 클래스를 만들면 된다. 이 구현들 중 가장 간단한 AnonymousObservable, AnonymousObserver, AnonymousDisposable를 살펴보자. AnonymousObservable<Element>부터 살펴보자.
// subscription시 호출되기 위한 SubscribeHandler을 생성자에서 받아서 가지고 있고 AnonymousObservableSink를 통해 Event를 발생시킨다. class AnonymousObservable<Element> : Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { _subscribeHandler = subscribeHandler } // subscribe에서 호출해줌. sink와 subscription을 리턴 override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } }
Sink는 subscription을 위한 ObserverType이 있을 때 Event를 발생시키는 로직(on함수)이 들어가 있는 부분이다. forwardOn을 통해 observer.on을 호출하게 된다. AnonymousObservable의 경우는 AnonymousObservableSink를 통해 이 기능을 구현한다.
class Sink<O : ObserverType> : Disposable { final func forwardOn(_ event: Event<O.E>) { // 구현 생략... } func dispose() { // 구현 생략... } } class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType { typealias E = O.E typealias Parent = AnonymousObservable<E> // SubscribeHandler에서 observer.on을 호출하면 호출되는 함수. func on(_ event: Event<E>) { switch event { case .next: if _isStopped == 1 { return } forwardOn(event) case .error, .completed: if AtomicCompareAndSwap(0, 1, &_isStopped) { forwardOn(event) dispose() } } } // 여기서 Observable을 생성할 때의 SubscribeHandler를 실행한다. AnonymousObservable.run으로부터 호출된다. func run(_ parent: Parent) -> Disposable { return parent._subscribeHandler(AnyObserver(self)) } } public struct AnyObserver<Element> : ObserverType { // observer.on을 실행하기 위해 observer.on을 내부에 저장해 놓는다. public init<O : ObserverType>(_ observer: O) where O.E == Element { self.observer = observer.on } public func on(_ event: Event<Element>) { // observer.on이 호출된다. return self.observer(event) } }
보통 SubscribeHandler의 구현을 보면 필요한 이벤트 발생시 파라메터로 넘어온 observer의 on에 필요한 이벤트를 넣어서 호출해준다.
func myJust<E>(element: E) -> Observable<E> { return Observable.create { observer in observer.on(.next(element)) observer.on(.completed) return Disposables.create() } } extension Observable { public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> { return AnonymousObservable(subscribe) } }
Observer의 전달은 다음과 같다. subscribe 함수는 파라메타로 ObserverType을 받는다. 하지만, 실제 사용자는 ObserverType을 직접 만들어서 넣어주지 않고 보통 (onNext, onCompleted, onError)를 넣어준다. 아래처럼 이것을 가지고 AnonymousObserver를 생성해서 AnonymousObservable에 이것을 넣어준다.
extension ObservableType { public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable { // 자세한 부분은 생략... ... // 사용자가 넣어준 (onNext, onCompleted, onError)를 가지고 AnonymousObserver를 생성한다. let observer = AnonymousObserver<E> { e in switch e { case .next(let value): onNext?(value) case .error(let e): onError?(e) disposable.dispose() case .completed: onCompleted?() disposable.dispose() } } return Disposables.create(self.subscribeSafe(observer), disposable) } } extension ObservableType { // All internal subscribe calls go through this method. func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E { return self.asObservable().subscribe(observer) } } class AnonymousObserver<ElementType> : ObserverBase<ElementType> { typealias Element = ElementType typealias EventHandler = (Event<Element>) -> Void private let _eventHandler : EventHandler init(_ eventHandler: @escaping EventHandler) { _eventHandler = eventHandler } override func onCore(_ event: Event<Element>) { return _eventHandler(event) } }
정리하면 Observable의 subscribe에 핸들러를 등록하면 [Producer.subscribe -> AnonymousObservable.run -> AnonymousObservableSink.run -> AnonymousObservable.subscribeHandler -> ObserverType.on -> AnonymousObserver.onCore]를 통해 지정해준 핸들러가 호출된다. 결국 최종적으로 사용자가 subscribe에 넣어준 (onNext, onCompleted, onError)가 호출되게 되는 것이다.
마지막으로 Observer를 취소하고 싶으면 subscribe의 리턴값인 Disposable의 dispose 함수를 호출해 주면 된다.
fileprivate class SinkDisposer: Cancelable { // sink와 subscription을 중단시킨다. sink에 dispose를 호출하면 observer의 on이 호출되지 않는다. // subscription에 dispose를 호출하면 SubscribeHandler를 중단시킬 수 있다. func dispose() { // 자세한 구현은 생략 sink.dispose() // subscription.dispose() } }
AnonymousObservableSink가 상속하는 Sink에 dispose가 구현되어 있다.
class Sink<O : ObserverType> : Disposable { fileprivate var _disposed: Bool // on 함수가 호출되어 observer를 호출하기 전에 dispose 여부를 확인하여 실행할지 안할지를 결정한다. final func forwardOn(_ event: Event<O.E>) { if _disposed { return } _observer.on(event) } func dispose() { _disposed = true _cancel.dispose() } }
댓글 없음:
댓글 쓰기