본문 바로가기

iOS/Concurrency

Concurrency (4) Custom Asynchronous Sequences With AsyncStream

 
이전글 - Concurrency(3) AsnycSequence & Intermediate Task
 
Modern Concurrency in Swift 를 읽고 간단 정리


 

Digging into AsyncSequence, AsyncInteratorProtocol and AsyncStream

AsyncSequence

AsyncSeqeunce

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@rethrows public protocol AsyncSequence {

    /// The type of asynchronous iterator that produces elements of this
    /// asynchronous sequence.
    associatedtype AsyncIterator : AsyncIteratorProtocol

    /// The type of element produced by this asynchronous sequence.
    associatedtype Element where Self.Element == Self.AsyncIterator.Element

    /// Creates the asynchronous iterator that produces elements of this
    /// asynchronous sequence.
    ///
    /// - Returns: An instance of the `AsyncIterator` type used to produce
    /// elements of the asynchronous sequence.
    func makeAsyncIterator() -> Self.AsyncIterator
}

 
 
Element 타입 정의

  • associatedtype Element를 통해 시퀀스의 요소 타입을 정의
  • 이 요소는 시퀀스가 비동기적으로 생성할 항목의 타입

Iterator 제공

  • makeAsyncInterator() 메서드를 구현하여 시퀀스의 비동기 이터레이터를 반환
  • 이 Iterator는 AsyncIteratorProtocol을 준수
  • next() 메서드 호출 시 비동기적으로 요소 반환

default implementations

  • 단일 값 반환 메서드 : prefix(while:), contains(), min(), max() etc
  • 다른 AsyncSequence를 반환 메서드 : map(_:) etc
  • 이 메서드들을 사용할 때도 당연히 await 키워드가 필요함

Seqeunce와 비교

  • AsyncSequence에는 비동기성이 추가됨
  • Sequence와 달리 처음 사용할 때 모든 값이 준비되었을 수도, 일부만 준비 됐을 수도, 없을 수도 있음
  • 따라서 await를 사용하여 값을 받아와야 함

 

AsyncIteratorProtocol

AsnycIteratorProtocol

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@rethrows public protocol AsyncIteratorProtocol {

    associatedtype Element

    /// Asynchronously advances to the next element and returns it, or ends the
    /// sequence if there is no next element.
    /// 
    /// - Returns: The next element, if it exists, or `nil` to signal the end of
    ///   the sequence.
    mutating func next() async throws -> Self.Element?
}

 
Overview

  • 비동기 시퀀스의 값을 생성하는 역할을 한다.
  • next() 를 정의하며, 이 메서드는 시퀀스의 다음 요소를 생성하거나 시퀀스의 끝을 알리기 위해 nil을 반환

 
End of Iteration

  • 반복의 종료를 구현할 때 시퀀스의 끝을 나타내기 위해 ni을 반환
  • next() 메서드에서 nil을 반환하거나 오류를 던지면 Itearator는 종료 상태가 됨
  • 종료된 이후 next() 메서드를 호출해도 항상 nil을 반환

Cancellation
Task Cancel을 처리하기 위해서는 아래 방법을 사용할 수 있다.

  • next() 메서드 내에서 현재 Task의 isCancelled 값을 확인하고 nil을 반환하여 시퀀스를 종료
func next() async -> Int? {
  if Task.isCancelled {
    return nil
  }
  ...
}
  • Task에서 checkCancellation을 호출하여 CancellationError를 던짐
func next() async -> Int? {
  try Task.checkCancellation()
  ...
}
  • withTaskCancellationHandler(operation:onCancel:)을 사용하여 next()를 구현하고 취소에 즉시 반응
func next() async -> Int? {
  return try await withTaskCancellationHandler {
      // operation
    } onCancel: {
      // 취소 시 처리할 작업    
    }
}

 
Simple Asynchronous Sequences
 
텍스트를 하나씩 타이핑하듯 출력하는 기능을 제공하는 예제

  • TypeWriter는 phrase를 글자 단위로 순차적으로 반환
  • 각 글자 사이에 일정한 지연 시간(1초)를 두는 방식으로 동작
struct TypeWriter: AsyncSequence {
  typealias Element = String
  
  let phrase: String
  
  func makeAsyncIterator() -> TypeWriterIterator {
    return TypeWriterIterator(phrase)
  }
}

struct TypeWriterIterator: AsyncIteratorProtocol {
    typealias Element = String
    
    let phrase: String
    var index: String.Index
    
    init(_ phrase: String) {
        self.phrase = phrase
        self.index = phrase.startIndex
    }
    
    mutating func next() async throws -> String? {
        guard self.index < self.phrase.endIndex else {
            return nil
        }
        
        try await Task.sleep(nanoseconds: 1_000_000_000)
        
        let result = String(self.phrase[self.phrase.startIndex...self.index])
        self.index = phrase.index(after: index)
        return result
    }
}

for try await item in TypeWriter(phrase: "Hello World!") {
  print(item)
}

 

Simplifying Asynchronous Sequences with AsyncStream

비동기 시퀀스를 더 간단하게 생성할 수 있도록, Apple에서는 AsyncStream 타입을 제공한다.
(AsyncIterator를 구현하지 않아도 된다)
 
Overview

  • AsyncSequnce 프로토콜을 채택한 구조체
  • 비동기 데이터 스트림을 생성하는 데 필요한 작업을 추상화
  • 클로저 기반의 간단한 인터페이스 제공
  • delegate, callback 클로저 기반 API를 async-await과 함께 사용하도록 하는 것에 적합

Contiunation

  • AsyncStream 초기화 시에 클로저는 AsyncStream.Continutation으로 부터 새로운 Element를 제공
  • continuation.yeild(_:) 메서드를 호출하여 스트림에 Element 제공
  • 더이상 생성할 요소가 없다면 continuation.finish() 호출
  • finish()가 호출되면 내부적으로 시퀀스 이터레이터가 nil을 반환
  • continuation은 Sendable을 준수하므로 안전하게 호출할 수 있음

Buffering

  • Element 생성 속도가 소비 속도보다 빠를  수 있기에 버퍼링 기능을 제공한다.
  • 지정된 개수의 가장 오래된 또는 가장 최근의 요소를 버퍼로 사용할 수 있다.
  • 버퍼의 기본 크기는 Int.max

Initializer
 
init(_:bufferingPolicy:_:)

let stream = AsyncStream<Int>(Int.self,
                              bufferingPolicy: .bufferingNewest(5)) { continuation in
    Task.detached {
        for _ in 0..<100 {
            await Task.sleep(1 * 1_000_000_000)
            continuation.yield(Int.random(in: 1...10))
        }
        continuation.finish()
    }
}


// Call point:
for await random in stream {
    print(random)
}

 
init(unfolding:onCacnel:) 
클로저에서 continuation을 직접 사용하지 않도록 해주는 convenience init

let stream = AsyncStream<Int> {
    await Task.sleep(1 * 1_000_000_000)
    return Int.random(in: 1...10)
} onCancel: { @Sendable () in print("Canceled.") }


// Call point:
for await random in stream {
    print(random)
}

 
AsnycStream 사용 예시

var pharse = "Hello, world!"
var index = pharse.startIndex

let stream = AsyncStream<String> {
  guard index < pharse.endIndex else { return nil }
  
  do {
    try await Task.sleep(nanoseconds: 1_000_000_000)
  } catch {
    return nil
  }
  
  let result = String(pharse[pharse.startIndex...index])
  index = pharse.index(after: index)
  return result
}

for try await item in stream {
  print(item)
}

 

Creating an Asynchronous Timer With AsyncStream

 
간단하게 3에서 1까지 세는 기능을 갖는 카운트다운 구현을 고민해보자.
 
가장 먼저 떠오르는 방법은 Apple에서 지원하는 Timer 타입을 사용하는 것이다.
 
다만 Timer는 비동기 이전 시대의 많은 불편함을 갖고 있어 다루기가 어렵다.
 
메인 쓰레드 제한

  • Timer는 메인쓰레드에서 실행되어야 함.
  • 비동기 작업이 메인 쓰레드에서 이뤄지지 않으면 추가적인 쓰레드 관리가 필요

 
비동기 클로저 지원 부족

  • Timer의 API는 비동기 클로저를 직접 받지 않음
  • 비동기 작업을 수행하려면 Timer와 비동기 코드 간의 연결을 직접 작성 필요

 
변수 안정성 보장

  • Timer가 동작하는 동안 변수에 접근하거나 변경할 때, 쓰레드 안정성을 보장하기 위해 추가적인 코드를 작성해야 함.
  • race condition을 예방하기 위해 필수

 
[타이머와 백그라운드 작업이 동시에 counter 에 접근하여 값을 변경]

var counter = 0

let timer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { _ in
    counter += 1
    print("Timer fired: \(counter)")
}

DispatchQueue.global().async {
    counter += 10
    print("Background work: \(counter)")
}

 
AsyncStream을 이용하여 카운트 다운 기능을 갖는 코드를 작성

struct Counter {
    func countdown(count: Int, message: String) async throws {
        var count = count
        
        let counter = AsyncStream<String> {
            guard count >= 0 else { return nil }
            
            do {
              try await Task.sleep(for: .seconds(1))
            } catch {
              return nil
            }
            
            defer { count -= 1 }
            
            if count == 0 {
              return "🎉" + message
            } else {
              return "\(count)..."
            }
        }
        
        for await countdownMessage in counter {
          print(countdownMessage)
        }
    }
}

let counter = Counter()

Task {
    try await counter.countdown(count: 3, message: "Hello, world!")
}

// 3...
// 2...
// 1...
// Hello, world!

 
count를 프로퍼티로 둔다면 여전히 race condition이 해결되지 않음. actor를 배울 때 확인해보자.
 

Adding an Asynchronous Stream to NotificationCenter

책에서 Notification을 AsyncStream으로 묶어 옵저빙하는 방법에 대해 소개하고 있다.
 
Apple에서 NotificationCenter로 부터 Notification을 AsyncSequnce의 형태로 받는 메서드를 지원하기 때문에 이를 먼저 간단히 확인하자.
 
NotificationCenter.notifications(named:object:)
 
기본 제공 메서드

    • 리턴 값은 NotificationCenter.Notifcations 로 AsyncSequnece를 채택
    • Notification 타입은 Sendable을 준수하지 않는데, 그 이유는 object나 userInfo 딕셔너리와 같은 구성 요소들이 Sendable이 될 수없기 때문
    • for-await-in 문법을 사용하여 Notification 시퀀스를 반복하며 사용
    • 이 시퀀스에서 Notifcation을 반복할 때 Xcode는 actor 경계를 넘는 것에 대한 워닝을 발생시킴

  • 이를 처리하기 위해 map(_:) 또는 compactMap(_:) 을 사용하여 Sendable한 속성으로 추출 필요

 

  func observeAppStatus() async {
    for await _ in NotificationCenter.default.notifications(named: UIApplication.willResignActiveNotification, object: nil)
      .compactMap({ _ in Void() }) {
        
    }
  }

 
 
책에서 AsyncStream을 직접 만들어 활용하는 예시

extension NotificationCenter {
  func notifications(for name: Notification.Name) -> AsyncStream<Notification> {
    AsyncStream<Notification> { contiunation in
      NotificationCenter.default.addObserver(
        forName: name,
        object: nil,
        queue: nil) { notification in
          contiunation.yield(notification)
        }
    }
  }
}

 

func observeAppStatus() async {
    for await _ in NotificationCenter.default.notifications(for: UIApplication.willResignActiveNotification)
      .compactMap({ _ in Void() }) {
      
    }
  }

 

Extending AsyncSequence

 
기존 타입을 확장하는 것은 async/await 기능 자체의 일부는 아니지만,
AsnycStream이나 AsyncSequence 프로토콜의 extension을 사용하면 더욱 간단한 코드 사용이 가능하다.
 
애플 오픈소스 swift-async-algorithms 

  • debounce, throttle, merge, zip 등 다양한 편리 기능이 포함됨.
  • asnyc/await을 Combine이나 RxSwift의 코드베이스에서 전환할 때 참고하면 도움이 될 것임

 
Sequence에 존재하는 forEach(_:) 구문을 AsyncSequence에 추가하는 예제

extension AsyncSequence {
  func forEach(_ body: (Element) async throws -> Void) async throws {
    for try await element in self {
      try await body(element)
    }
  }
}

try await counter.forEach {
  ...
}