2016년 12월 28일 수요일

RxSwift 소스 분석 - Scheduler에 대한 이해

RxSwift 소스 분석해보기

1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해

https://github.com/ReactiveX/RxSwift


Scheduler

observer callback이 실행되는 쓰레드는 observerOn으로 subscription과 unsubscription logic이 실행되는 쓰레드는 subscribeOn으로 지정할 수 있다.
'Observable+Concurrency.swift' 파일에서 extension으로 ObservableType에 지정한다.
extension ObservableType {
  public func observeOn(_ scheduler: ImmediateSchedulerType)

  public func subscribeOn(_ scheduler: ImmediateSchedulerType)
}
두 함수 모두 ImmediateSchedulerType을 파라메타로 받는다.
public protocol ImmediateSchedulerType {
  func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable
}
observeOn의 경우를 먼저 살펴보자.
scheduler가 SerialDispatchQueueScheduler인 경우에는 ObserveOnSerialDispatchQueue를 아닌 경우에는 ObserveOn을 사용한다
public func observeOn(_ scheduler: ImmediateSchedulerType) -> Observable<E> {
  if let scheduler = scheduler as? SerialDispatchQueueScheduler {
      return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
  }
  else {
      return ObserveOn(source: self.asObservable(), scheduler: scheduler)
    }
  }
ObserveOnSerialDispatchQueue를 보면 앞에서 살펴본 AnonymousObservable과 거의 유사한 구조를 가지고 있음을 알 수 있다. 차이점은 원래의 observable에다가 subscribe를 실행시키면서 ObserveOnSerialDispatchQueueSink를 파라메타로 넘겨주는 것이다. 이렇게 함으로서 원래의 observable의 subscribe handler가 실행될 때 observeer로 ObserveOnSerialDispatchQueueSink의 onCore함수가 호출되게 된다. 아래에서 보다시피 onCore에서 scheduler.schedule를 실행함으로서 scheduler에서 지정한 쓰레드에서 action이 실행되게 된다. ObserveOnSerialDispatchQueueSink의 init을 보면 cachedScheduleLambda는 사용자가 지정해준 observer를 실행하도록 되어 있는 closure이다.
init(scheduler: SerialDispatchQueueScheduler, observer: O, cancel: Cancelable) {
  self.scheduler = scheduler
  self.observer = observer
  self.cancel = cancel
  super.init()

  cachedScheduleLambda = { sink, event in
    sink.observer.on(event)

    // 코드 생략...

    return Disposables.create()
  }
}

override func onCore(_ event: Event<E>) {
    let _ = self.scheduler.schedule((self, event), action: cachedScheduleLambda)
  }
MainScheduler를 통해 schedule가 호출될 때 어떻게 동작하는지를 보자.
초기화시 _mainQueue를 DispatchQueue.main로 설정한다. schedule 호출시 호출되는 scheduleInternal를 override하여 _mainQueue.async안에서 action을 실행한다.
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
  // 자세한 코드 생략...

  _mainQueue.async {
    if !cancel.isDisposed {
      _ = action(state)
    }

    _ = AtomicDecrement(&self.numberEnqueued)
  }
}

RxSwift 소스 분석 - 기본구조이해 - 2

RxSwift 소스 분석해보기

1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해

https://github.com/ReactiveX/RxSwift

가장 기본이 되는 구조는 이벤트를 발생시키는 Observable이 있고 이것을 받아보는 Observer, 그리고 Observer가 이벤트를 받는 것을 취소할 수 있게 해주는 Disposable이 있다. 이를 코드로 구현하면 아래와 같이 된다.
public protocol ObservableType {
  associatedtype E

  func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

public protocol ObserverType {
  associatedtype E

  func on(_ event: Event<E>)
}

public enum Event<Element> {
  case next(Element)
  case error(Swift.Error)
  case completed
}

public protocol Disposable {
  func dispose()
}
위 protocol들의 구현체로 다음의 클래스들을 만든다. ObservableType <-- Observable <-- Producer / ObserverType <-- ObserverBase / Disposable <-- Cancelable
public class Observable<Element> : ObservableType {
  public typealias E = Element

  public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
    abstractMethod() // subclass에서 구현을 해서 사용하도록 한다.
  }
}

class Producer<Element> : Observable<Element> {
  override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
    // 자세한 구현 생략...
    let disposer = SinkDisposer()
    let sinkAndSubscription = run(observer, cancel: disposer)
    ...
    return disposer
  }
}

class ObserverBase<ElementType> : Disposable, ObserverType {
  func on(_ event: Event<E>) {
    switch event {
    case .next:
      if _isStopped == 0 {
        onCore(event)
      }
    case .error, .completed:
      if !AtomicCompareAndSwap(0, 1, &_isStopped) {
        return
      }

      onCore(event)
    }
  }

  // ObserverBase를 상속받는 클래스는 이 함수를 구현해서 event handler를 호출하도록 한다.
  func onCore(_ event: Event<E>) {
    abstractMethod()
  }
}

public protocol Cancelable : Disposable {
  // Was resource disposed.
  var isDisposed: Bool { get }
}
Producer의 subscribe에서 ObserverType을 받아서 이를 내부에 연결시킨다. 나중에 이벤트가 발생하면 ObserverBase.on이 호출된다.
이제 이 클래스들을 기반으로 하여 원하는 기능을 구현하고 있는 클래스를 만들면 된다. 이 구현들 중 가장 간단한 AnonymousObservable, AnonymousObserver, AnonymousDisposable를 살펴보자. AnonymousObservable<Element>부터 살펴보자.
// subscription시 호출되기 위한 SubscribeHandler을 생성자에서 받아서 가지고 있고 AnonymousObservableSink를 통해 Event를 발생시킨다.
class AnonymousObservable<Element> : Producer<Element> {
  typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

  let _subscribeHandler: SubscribeHandler

  init(_ subscribeHandler: @escaping SubscribeHandler) {
    _subscribeHandler = subscribeHandler
  }

  // subscribe에서 호출해줌. sink와 subscription을 리턴
  override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
Sink는 subscription을 위한 ObserverType이 있을 때 Event를 발생시키는 로직(on함수)이 들어가 있는 부분이다. forwardOn을 통해 observer.on을 호출하게 된다. AnonymousObservable의 경우는 AnonymousObservableSink를 통해 이 기능을 구현한다.
class Sink<O : ObserverType> : Disposable {
  final func forwardOn(_ event: Event<O.E>) {
    // 구현 생략...
  }

  func dispose() {
    // 구현 생략...
  }
}

class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
  typealias E = O.E
  typealias Parent = AnonymousObservable<E>

  // SubscribeHandler에서 observer.on을 호출하면 호출되는 함수.
  func on(_ event: Event<E>) {
    switch event {
    case .next:
      if _isStopped == 1 {
        return
      }
      forwardOn(event)
    case .error, .completed:
      if AtomicCompareAndSwap(0, 1, &_isStopped) {
        forwardOn(event)
        dispose()
      }
    }
  }

  // 여기서 Observable을 생성할 때의 SubscribeHandler를 실행한다. AnonymousObservable.run으로부터 호출된다.
  func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
  }
}

public struct AnyObserver<Element> : ObserverType {
  // observer.on을 실행하기 위해 observer.on을 내부에 저장해 놓는다.
  public init<O : ObserverType>(_ observer: O) where O.E == Element {
    self.observer = observer.on
  }

  public func on(_ event: Event<Element>) {
    // observer.on이 호출된다.
    return self.observer(event)
  }
}
보통 SubscribeHandler의 구현을 보면 필요한 이벤트 발생시 파라메터로 넘어온 observer의 on에 필요한 이벤트를 넣어서 호출해준다.
func myJust<E>(element: E) -> Observable<E> {
  return Observable.create { observer in
    observer.on(.next(element))
    observer.on(.completed)
    return Disposables.create()
  }
}

extension Observable {
  public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
    return AnonymousObservable(subscribe)
  }
}
Observer의 전달은 다음과 같다. subscribe 함수는 파라메타로 ObserverType을 받는다. 하지만, 실제 사용자는 ObserverType을 직접 만들어서 넣어주지 않고 보통 (onNext, onCompleted, onError)를 넣어준다. 아래처럼 이것을 가지고 AnonymousObserver를 생성해서 AnonymousObservable에 이것을 넣어준다.
extension ObservableType {
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable {
    // 자세한 부분은 생략...
    ...

    // 사용자가 넣어준 (onNext, onCompleted, onError)를 가지고 AnonymousObserver를 생성한다.
    let observer = AnonymousObserver<E> { e in
      switch e {
      case .next(let value):
        onNext?(value)
      case .error(let e):
        onError?(e)
        disposable.dispose()
      case .completed:
        onCompleted?()
        disposable.dispose()
      }
    }
    return Disposables.create(self.subscribeSafe(observer), disposable)
  }
}

extension ObservableType {
  // All internal subscribe calls go through this method.
  func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
    return self.asObservable().subscribe(observer)
  }
}

class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
  typealias Element = ElementType

  typealias EventHandler = (Event<Element>) -> Void

  private let _eventHandler : EventHandler

  init(_ eventHandler: @escaping EventHandler) {
    _eventHandler = eventHandler
  }

  override func onCore(_ event: Event<Element>) {
    return _eventHandler(event)
  }
}
정리하면 Observable의 subscribe에 핸들러를 등록하면 [Producer.subscribe -> AnonymousObservable.run -> AnonymousObservableSink.run -> AnonymousObservable.subscribeHandler -> ObserverType.on -> AnonymousObserver.onCore]를 통해 지정해준 핸들러가 호출된다. 결국 최종적으로 사용자가 subscribe에 넣어준 (onNext, onCompleted, onError)가 호출되게 되는 것이다.
마지막으로 Observer를 취소하고 싶으면 subscribe의 리턴값인 Disposable의 dispose 함수를 호출해 주면 된다.
fileprivate class SinkDisposer: Cancelable {
  // sink와 subscription을 중단시킨다. sink에 dispose를 호출하면 observer의 on이 호출되지 않는다.
  // subscription에 dispose를 호출하면 SubscribeHandler를 중단시킬 수 있다.
  func dispose() {
    // 자세한 구현은 생략
    sink.dispose() // 
    subscription.dispose()
  }
}
AnonymousObservableSink가 상속하는 Sink에 dispose가 구현되어 있다.
class Sink<O : ObserverType> : Disposable {
  fileprivate var _disposed: Bool

  // on 함수가 호출되어 observer를 호출하기 전에 dispose 여부를 확인하여 실행할지 안할지를 결정한다.
  final func forwardOn(_ event: Event<O.E>) {
    if _disposed {
      return
    }
    _observer.on(event)
  }

  func dispose() {
    _disposed = true
    _cancel.dispose()
  }
}

RxSwift 소스 분석 - 기본구조이해

RxSwift 소스 분석해보기

1. 기본구조이해
2. 기본구조이해 - 2
3. Scheduler에 대한 이해
4. Subject에 대한 이해

https://github.com/ReactiveX/RxSwift

기초 이해

시작하기 위한 초간단 아이템 호출 구조

  • Observable.create를 통해 Observable<E>를 생성한다. -> AnonymousObservable이 생성된다. ObservableType+Creation.swift
extension Observable {
  // element가 없는 AnonymousObservable을 생성한다.
  public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
    return AnonymousObservable(subscribe)
  }
}
  • AnonymousObservable에 subscribeHandler를 넘겨준다. -> subscribeHandler는 (AnyObserver<E> -> Disposable)이다.
class AnonymousObservable<Element> : Producer<Element> {
  let _subscribeHandler:  SubscribeHandler

  // 내부에 SubscribeHandler를 저장
  init(_ subscribeHandler: @escaping SubscribeHandler) {
    _subscribeHandler = subscribeHandler
  }
}
  • 생성된 Observable에 subscribe를 하면 내부에서 Producer의 subscribe를 호출한다. -> AnonymousObservable은 Producer를 상속한다. ObservableType+Extensions.swift
extension ObservableType {
  public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable {
    ...
    // self.asObservable().subscribe(observer)를 호출한다.
    // observer는 사용자가 지정해준 onNext, onError등을 Event에 따라 실행해주는 AnonymousObserver이다.
  }
}
  • Producer의 subscribe에서 AnonymousObservable의 run을 호출한다.
class Producer<Element> : Observable<Element> {
  override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    // scheduler에서 AnonymousObservable의 run을 실행한다.
  }
}
  • AnonymousObservableSink의 run을 호출하여 subscribeHandler를 호출한다. subscribeHandler는 사용자가 넣어준 handler인데 조건에 따라 observer의 on을 호출한다.(AnyObserver를 통한 forwarding -> AnonymousObservableSink.on -> AnonymousObserver.on)
class AnonymousObservable<Element> : Producer<Element> {
  override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
    // AnonymousObservableSink의 run을 호출한다.
  }
}

class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
  func on(_ event: Event<E>) {
    // AnonymousObserver를 통해 사용자가 넣어준 onNext, onError등의 함수를 호출한다.
  }

  func run(_ parent: Parent) -> Disposable {
    // observable을 생성할 때 넣어준 SubscribeHandler를 호출한다.
    // SubscribeHandler 안에서 observer.on을 호출하면 위의 on 함수가 호출된다.
  }
}

Subscription에 대한 취소 구조

  • subscription을 호출하면 이에 대해 Disposable을 리턴한다. -> Disposable은 SinkDisposer이다.
class Producer<Element> : Observable<Element> {
  override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    let disposer = SinkDisposer()
    ...
    return disposer
  }
}
  • AnonymousObservable의 run을 실행할 때 파라메터로 SinkDisposer를 넘겨주어서 AnonymousObservableSink도 SinkDisposer를 같이 바라보게 한다.
  • SinkDisposer는 sink와 subscription을 가지고 있는다.
    • subscribe: _subscribeHandler의 리턴
    • sink: event를 실행하는 AnonymousObservableSink
  • 사용자가 subscription을 cancel하고 싶으면 Disposable.dispose를 호출한다.
    • sink와 subscription 각각에 dispose를 호출한다.
    • sink에 dispose가 불리게 되면 observer의 on이 불리지 않게 된다. -> observer.on의 호출 전 dispose 여부 확인
    • subscription에 dispose가 불리게 되면 observable을 만들때의 subscription handler의 구현에 따라 관련 내용이 취소되도록 한다: ex) 네트워크 요청이 취소되도록 한다. 타이머가 중단되도록 한다.

Scheduler를 사용하는 경우의 구조

  • observeOn을 사용해서 observer가 실행되는 scheduler를 지정할 수 있다.
  • observeOn의 리턴은 원래의 observable과 scheduler를 가지고 있는 ObserveOnSerialDispatchQueue이다.
  • ObserveOnSerialDispatchQueue는 sink로는 ObserveOnSerialDispatchQueueSink를, subscription으로는 원래의 source에 subscribe한것을 갖는다.
  • 결국 사용자가 subscribe를 실행하면 ObserveOnSerialDispatchQueueSink가 불리게 되는데 여기서 scheduler를 통해 원래의 observer를 event를 통해 실행한다.

Simple UITextField bindings

  • text property를 위한 reactive wrapper를 만든다.
public var text: ControlProperty<String?> {
  return UIControl.rx.value(...)
}
  • UIControl.rx.value에서 Observable.create를 써서 observable을 만들고 UIBindingObserver의 bindingObserver를 사용해서 최종적으로 ControlProperty를 만든다.
  • ControlProperty는 ObservableType이고 ObserverType이다.

2016년 12월 22일 목요일

Alamofire에서 사용하는 Queue에 대한 정리


  • SessionManager의 backgroundCompletionHandler
    • background transfer가 끝났을 때 'DispatchQueue.main.async'로 실행된다.
  • Upload
    • 정상적인 경우의 실행은 'DispatchQueue.global(qos: .utility).async' 로 이루어진다.
    • encodingCompletion은 정상적으로 upload가 완료되었을 때나 중간에 에러가 발생했을 때에나 항상 'DispatchQueue.main.async'에서 실행된다.
  • Retry
    • (request, download, upload)중 에러가 발생하면 allowRetrier를 호출하는데 이 함수의 내용은 'DispatchQueue.global(qos: .utility).async'를 통해 asynchronous로 실행된다. 그리고 should() 함수의 completionHandler의 구현에서 retry를 'DispatchQueue.global(qos: .utility).asyncAfter()'에서 실행한다.
    • URLSession으로부터의 응답이 에러를 포함하고 있어 retry를 확인하는 경우 retrier.should()의 completionHandler가 'DispatchQueue.global(qos: .utility).asyncAfter()' 에서 실행된다.
  • Request
    • 서버 요청을 위한 task(dataTask, downloadTask, uploadTask)를 만들 때 queue를 parameter로 받아서 'queue.syncResult' 로 실행하여 task를 생성한다. parameter로 받는 queue는 SessionManager에서 'DispatchQueue(label: "org.alamofire.session-manager." + UUID().uuidString)' 로 고정되어 있다.
  • Response
    • response handler가 기본으로 실행되는 queue는 'DispatchQueue.main'이다. 하지만 사용자가 지정할 수 있다. async로 호출한다.
  • progress handler
    • 기본은 'DispatchQueue.main'이다. 하지만, 사용자가 직접 queue를 지정해 줄 수 있다. async로 호출한다.

2016년 12월 19일 월요일

Reqwest 소스 분석

https://github.com/seanmonstar/reqwest


RequestBuilder는 HTTP 요청에 가장 기본이 되는 structure로서 Request에 관련된 값들을 가지고 있는다.
// Arc: An atomically reference counted wrapper for shared state.
pub struct RequestBuilder {
  // client를 항상 새로 만들지 않고 재활용을 하기 위해 Arc를 사용한다.
  client: Arc<ClientRef>,
  method: Method,
  url: Result<Url, ::UrlError>,
  _version: HttpVersion,
  headers: Headers,
  body: Option<::Result<Body>>,
}

// Result와 Option은 rust에서 기본으로 제공한다.
enum Result<T, E> {
   Ok(T),
   Err(E),
}

pub enum Option<T> {
    None,
    Some(T),
}
Client는 실제 네트워크상으로 HTTP 요청을 하는 것에 관련되어 hyper에 관련된 정보(client, redirect_policy)를 내부에 가지고 있는다. Method는 어떤 메소드(Get, Post같은)를 사용할 것인지를 의미하고 HttpVersion은 HTTP/1.1, HTTP/2.0등 어떤 버전을 사용할 것인지를 의미한다. 현재 Reqwest는 HTTP/1.1만을 사용한다. RequestBuilder가 생성되고 나면 header와 body를 관련 함수를 사용하여 설정할 수 있게 한다. RequestBuilder를 자세히 살펴보기 전에 Client를 먼저 살펴보자.
pub struct Client {
  // 재활용이 가능하도록 하기 위해 Arc로 보호
  inner: Arc<ClientRef> // ::hyper::Client,
}

// Mutex: A mutual exclusion primitive useful for protecting shared data
struct ClientRef {
  hyper: ::hyper::client,
  // 값의 변경이 가능하도록 하기 위해 Mutex를 사용한다.
  redirect_policy: Mutex<RedirectPolicy>,
}
Client의 생성은 Client::new()를 호출함으로써 이루어진다.
impl Client {
  pub fn new() -> ::Result<Client> {
    let mut client = try!(new_hyper_client());
    client.set_redirect_policy(::hyper::client::RedirectPolicy::FollowNone);
    Ok(Client {
      inner: Arc::new(ClientRef {
        hyper:client,
        redirect_policy: Mutex::new(RedirectPolicy::default()),
      }),
    })
  }

  pub fn redirect(&mut self, policy: RedirectPolicy) {
    *self.inner.redirect_policy.lock().unwrap() = policy;
  }
}
try!는 Result가 error이면 바로 error를 리턴하는 매크로이다. new_hyper_client()를 사용하여 hyper의 Client를 생성한 후 redirect policy를 FollowNone으로 설정한다. 그리고, Client를 만들어서 리턴한다. 기본으로 제공하는 Result를 간단하게 사용하기 위해 아래의 type alias를 지정하여 사용한다. Error도 여기서 정의한 enum으로서 request시 에러가 발생했을 때 사용된다.
pub type Result<T> = ::std::result::Result<T, Error>;
new_hyper_client()를 살펴보자.
fn new_hyper_client() -> ::Result<::hyper::Client> {
  use tls:TlsClient;
  Ok(::hyper::Client::with_connector(
    ::hyper::client::Pool::with_connector(
      Default::default(),
      ::hyper::net::HttpsConnector::new(try!(TlsClient::new()))
    )
  ))
}
hyper의 Client를 만들어주기 위해 관련 함수들을 사용한다. 현재는 HTTPS만을 지원하는 것으로 한다. HTTPS 지원을 위하여 tls.rs에 있는 관련 함수들을 사용한다.
간단하게 get을 요청하는 경우에 사용하기 위한 간단한 함수를 만들어 보자.
impl Client {
  pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
    self.request(Method::Get, url)
  }

  pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
    let url = url.into_url();
    RequestBuilder {
      client: self.inner.clone(), // reference count가 증가한다.
      method: method,
      url: url,
      _version: HttpVersion::Http11,
      headers: Headers::new(),
      body: None
    }
  }
}
IntoUrl은 hyper에서 url에 사용하는 Url로의 변환을 해주는 trait이다. hyper에는 Url, str, String에 대해 정의되어 있다.
trait IntoUrl {
  fn into_url(self) -> Result<Url, UrlError>;
}
위와 같이 get을 만들면 마찬가지로 post와 head도 만들 수 있다.
impl Client {
  pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
    self.request(Method::Post, url)
  }

  pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
    self.request(Method::Head, url)
  }
}
RequestBuilder를 만들고 나면 send를 통해 서버에 요청을 할 수 있다.
impl RequestBuilder {
  pub fn send(mut self) -> ::Result<Response> {
    // user agent를 지정해주지 않았으면 기본값으로 지정한다.
    // Headers는 지정한 타입을 받을 수 있게 정의되어 있고 이 타입이 정의한 함수들로부터 필요한 값을 가져온다.
    if !self.headers.has::<UserAgent>() {
      self.headers.set(UserAgent(DEFAULT_USER_AGENT.to_owned()));
    }

    if !self.headers.has::<Accept>() {
      // 'Accept: */*'
      self.headers.set(Accept::star());
    }

    let client = self.client;
    let mut method = self.method;
    // Url로 변경
    let mut url = try!(self.url);
    let mut headers = self.headers;
    // Option<Result<Body>>를 Option<Body>로 변경
    let mut body = match self.body {
      Some(b) => Some(try!(b)),
      None => None,
    };

    // redirect되는 url들을 저장한다.
    let mut urls = Vec::new();

    loop {
      // 서버로부터 응답을 가져온다.
      let res = {
        let mut req = client.hyper.request(method.clone(), url.clone())
            .headers(headers.clone());

        if let Some(ref mut b) = body {
          // hyper에서 사용하는 Body로 변환
          let body = body::as_hyper_body(b);
          req = req.body(body);
        }

        try!(req.send())
      };

      // status를 확인하여 redirect가 필요한지를 본다.
      let should_redirect = match res.status {
        StatusCode::MovedPermanently |
        StatusCode::Found |
        StatusCode::SeeOther => {
          body = None;
          match Method {
            Method::Get | Method::Head => {},
            _ => {
              method = Method::Get;
            }
          }
          true
        },
        StatusCode::TemporaryRedirect |
        StatusCode::PermanentRedirect => {
          if let Some(mut body) = body {
            // redirect시 body를 다시 사용할 수 있는지 확인한다.
            body::can_reset(body)
          } else {
            true
          }
        },
        _ => false,
      };

      if should_redirect {
        // response로부터 Location 헤더를 가져온다.
        // Result가 된다.
        let loc = {
          // Option<Result>가 된다.
          let loc = res.headers.get::<Location>().map(|loc| url.join(loc));
          if let Some(loc) = loc {
            loc
          } else {
            // Location이 오지 않은 경우
            return Ok(Response {
              inner: res
            });
          }
        };

        url = match loc {
          Ok(loc) => {
            headers.set(Referer(url.to_string()));
            urls.push(url);
            // check_redirect 함수가 끝날때까지 redirect_policy의 lock을 잡고 있는다.
            if check_redirect(&client.redirect_policy.lock().unwrap(), &loc, &urls)? {
              loc
            } else {
              // redirect를 허락하지 않는다.
              return Ok(Response {
                inner: res
              });
            }
          },
          Err(e) => {
            // url.join이 실패한 경우
            return Ok(Response {
              inner: res
            });
          },
        };
        // 새 url로 다시 서버에 요청하도록 한다.
      } else {
        // redirect하지 않는 경우
        return Ok(Response {
          inner: res
        });
      }
    }
  }
}
기본 user agent를 사용할 때 to_owned를 통해 복사된 값을 사용한다(Cloning). DEFAULT_USER_AGENT는 아래와 같이 정의되어 있다.
static DEFAULT_USER_AGENT: &'static str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
send 함수가 리턴하는 Response는 다음과 같다.
pub struct Response {
  inner: ::hyper::client::Response,
}
Response로부터 사용할 수 있는 몇가지 함수들을 정의하여 쉽게 사용할 수 있도록 한다.
impl Response {
  // status code를 리턴한다.
  pub fn status(&self) -> &StatusCode {
    &self.inner.status
  }
  // headers를 리턴한다.
  pub fn headers(&self) -> &Headers {
    &self.inner.headers
  }
  // http version을 리턴한다.
  pub fn version(&self) -> &HttpVersion {
    &self.inner.version
  }
}
RequestBuilder에 header를 추가할 수 있게 하자.
imple RequestBuilder<'a> {
  // 헤더 하나를 추가한다.
  pub fn header<H: ::header::Header + ::header::HeaderFormat>(mut self, header: H) -> RequestBuilder<'a> {
    self.headers.set(header);
    self
  }

  // 헤더의 집합을 추가한다.
  pub fn header<H: >(mut self, headers: ::header::Headers) -> RequestBuilder<'a> {
    self.headers.extend(headers.iter());
    self
  }
}
RequestBuilder에 body를 추가할 수 있게 하자.
// 넣으려는 body는 Into<Body>를 구현하고 있어야 한다.
impl RequestBuilder {
  pub fn body<T: Into<Body>>(mut self, body: T) -> RequestBuilder {
    // Body로 변환한 후 Option<Result>로 저장한다.
    self.body = Some(Ok(body.into()));
    self
  }
}
현재까지 사용된 Body에 관련된 부분은 Into<Body>와 body::as_hyper_body이다. 이와 관련하여 Body의 구현을 살펴보자. Body의 구조는 아래와 같다.
pub struct Body {
  reader: Kind,
}

// Box: A pointer type for heap allocation.
// Read: The Read trait allows for reading bytes from a source.
// Send: Types that can be transferred across thread boundaries.
enum Kind {
  // Read가 구현된 것에 대한 처리. File을 읽는다던지 하는 것.
  Reader(Box<Read + Send>, Option<u64>),
  // byte에 대한 처리.
  Bytes(Vec<u8>),
}
Body를 위한 From구현은 Vec<u8>, String, [u8], &str, File의 5개의 타입에 대해 구현한다. From과 Into는 둘 중 어느 한쪽이 구현되면 나머지 한쪽도 자동으로 구현된다.
impl From<Vec<u8>> for Body {
  fn from(v: Vec<u8>) -> Body {
    Body {
      reader: Kind::Bytes(v),
    }
  }
}

impl From<String> for Body {
  fn from(s: String) -> Body {
    s.into_bytes().into() // Vec<u8>로 변환 후 이의 into를 사용한다.
  }
}

impl<'a> From<&'a [u8]> for Body {
  fn from(s: &'a [u8]) -> Body {
    s.to_vec().into() // Vec<u8>로 변환 후 이의 into를 사용한다.
  }
}

impl<'a> From<&'a str> for Body {
  fn from(s: &'a str) -> Body {
    s.as_bytes().into() // Vec<u8>로 변환 후 이의 into를 사용한다.
  }
}

impl From<File> for Body {
  fn from(f: File) -> Body {
    // 파일의 크기를 가져온다.
    let len = f.metadata().map(|m| m.len()).ok();
    Body {
      reader: Kind::Reader(Box::new(f), len),
    }
  }
}
reader로부터 Body를 생성해주는 헬퍼함수로 Body::new()를 정의한다.
impl Body {
  pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
    Body {
      reader: Kind::Reader(Box::new(reader), None),
    }
  }
}
이제 as_hyper_body를 정의하자.
// Body를 hyper에서 사용하는 Body로 변환한 후 이것을 리턴한다.
pub fn as_hyper_body<'a>(body: &'a mut Body) -> ::hyper::client::Body<'a> {
  match body.reader {
    Kind::Bytes(ref bytes) => {
      let len = bytes.len();
      ::hyper::client::Body::BufBody(bytes, len)
    },
    Kind::Reader(ref mut reader, len_opt) => {
      match len_opt {
        Some(len) => ::hyper::client::Body::SizedBody(reader, len),
        None => ::hyper::client::Body::ChunkedBody(reader),
      }
    },
  }
}
재활용이 가능한 Body인지 아닌지를 판별해주는 함수를 정의한다.
// File같은 경우 한번 읽으면 읽는 위치가 계속 증가하기 때문에 그대로 재활용할 수 없다.
pub fn can_reset(body: &Body) -> bool {
  match body.reader {
    Kind::Bytes(_) => true,
    Kind::Reader(..) => false,
  }
}
ClientRef에 redirect policy를 위한 redirect_policy가 있어서 설정을 변경할 수 있다. 이에 관련된 내용을 살펴보자.
pub struct RedirectPolicy {
  inner: Policy,
}

// Sync: Types for which it is safe to share references between threads.
enum Policy {
  // 사용자 지정 함수에 의해 redirect를 할지 말지를 정한다.
  Custom(Box<Fn(&Url, &[Url]) -> ::Result<bool> + Send + Sync + 'static>),
  Limit(usize), // redirect의 수에 제한을 둔다.
  None, // redirect를 하지 않는다.
}
위 enum의 값들을 생성하는 함수를 제공한다.
impl RedirectPolicy {
  pub fn limited(max: usize) -> RedirectPolicy {
    RedirectPolicy {
      inner: Policy::Limit(max),
    }
  }

  pub fn none() -> RedirectPolicy {
    RedirectPolicy {
      inner: Policy::None,
    }
  }

  pub fn custom<T>(policy: T) -> RedirectPolicy
  where T: Fn(&Url, &[Url]) -> ::Result<bool> + Send + Sync + 'static {
    RedirectPolicy {
      inner: Policy::Custom(Box::new(policy)),
    }
  }
}
Client 생성시 사용하는 default 함수는 다음과 같다.
// Default: A trait for giving a type a useful default value.
// default는 redirect를 10번까지 허용하는 것으로 지정한다.
impl Default for RedirectPolicy {
  fn default() -> RedirectPolicy {
    RedirectPolicy::limited(10)
  }
}
RequestBuilder의 send 함수에서는 redirect를 확인하기 위해 check_redirect 함수를 호출한다.
// Client에서 설정한 RedirectPolicy를 받아서 이의 redirect를 호출한다.
pub fn check_redirect(policy: &RedirectPolicy, next: &Url, previous: &[Url]) -> ::Result<bool> {
  policy.redirect(next, previous)
}
RedirectPolicy에 redirect를 구현한다.
impl RedirectPolicy {
  fn redirect(&self, next: &Url, previous: &[Url]) -> ::Result<bool> {
    match self.inner {
      Policy::Custom(ref custom) => custom(next, previous),
      Policy::Limit(max) => {
        if previous.len() == max { // 최대한 허용하는 redirect를 넘은 경우.
          Err(::Error::TooManyRedirects)
        } else if previous.contains(next) { // 이전에 이미 redirect한 url인 경우.
          Err(::Error::RedirectLoop)
        } else { // redirect를 허용하는 경우.
          Ok(true)
      },
      Policy::None => Ok(false),
    }
  }
}
앞에서 ::hyper::Client를 만들때 HttpsConnector를 만들기 위해 파라메터로 TlsClient::new()를 사용했다.
// 멤버 하나를 가지고 있는 tuple
pub struct TlsClient(TlsConnector);

impl TlsClient {
  pub fn new() -> ::Result<TlsClient> {
    // builder의 결과가 Ok이면 and_then의 내용을 실행하고 map을 통해 T를 U로 변경하고 map_err를 통해 E를 F로 변경한다.
    // and_then: Result가 Ok이면 실행.
    // map: Result가 Ok이면 Result<T, E>를 Result<U, E>로 변경.
    // map_err: Result가 Err이면 Result<T, E>를 Result<T, F>로 변경.
    TlsConnector::builder()
        .and_then(|c| c.build()) // c.build()의 리턴값은 Result<TlsConnector>이다.
        .map(TlsClient) // Ok(TlsClient(TlsConnector))가 된다.
        .map_err(|e| ::Error::Http(::hyper:Error:Ssl(Box::new(e))))
  }
}
TlsClient가 HttpsConnector에 사용되기 위해서는 SslClient를 구현해야 한다.
impl SslClient for TlsClient {
  type SslStream = TlsStream;

  // wrap a client stream with SSL.
  fn wrap_client(&self, stream: HttpStream, host: &str) -> ::hyper::Result<Self::Stream> {
    self.0.connect(host, stream).map(TlsStream).map_err(|e| {
      match e {
        HandshakeError::Failure(e) => ::hyper::Error::Ssl(Box::new(e)),
        HandshakeError::Interrupted(..) => {
          unreachable!("TlsClient::handshake interrupted")
        }
      }
    })
  }
}
TlsStream이 SslStream으로 사용되기 위해서 Read, Write, Clone, NetworkStream을 구현해야 한다. native_tls의 TlsStream이 이를 다 구현하고 있기 때문에 TlsStream은 이를 wrapping해서 필요한 구현을 delegate로 구현한다.
// 이름 중복을 피하기 위해 native_tls의 TlsStream을 NativeTlsStream로 이름을 바꿔서 사용한다.
pub struct TlsStream(NativeTlsStream<HttpStream>);

impl Read for TlsStream {
  fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
    self.0.read(buf)
  }
}

impl Write for TlsStream {
  fn write(&mut self, data: &[u8]) -> io::Result<usize> {
    self.0.write(data)
  }

  fn flush(&mut self) -> io::Result<()> {
    self.0.flush()
  }
}

impl Clone for TlsStream {
  fn clone(&self) -> TlsStream {
    unreachable!("TlsStream::clone is never used for the Client")
  }
}

impl NetworkStream for TlsStream {
  fn peer_addr(&mut self) -> io::Result<SocketAddr> {
    self.0.get_mut().peer_addr()
  }

  fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
    self.0.get_ref().set_read_timeout(dur)
  }

  fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
    self.0.get_ref().set_write_timeout(dur)
  }
}

Building asynchronous views in SwiftUI 정리

Handling loading states within SwiftUI views self loading views View model 사용하기 Combine을 사용한 AnyPublisher Making SwiftUI views refreshable r...