본문 바로가기

iOS/Concurrency

Concurrency (7) Concurrent Code With TaskGroup

 

이전글 - Concurrency(6) Testing Asynchronous Code
 
Modern Concurrency in Swift를 읽고 간단 정리


 

현재 챕터까지 진행하면서 아래 내용을 학습하였다.

  • async/await를 사용한 코드 설계
  • Asynchronous Sequence 생성
  • async let 바인딩을 통한 Task 병렬 실행

 

특히 asnyc let 바인딩은 일부 태스크를 병렬로 실행하면서 (Task3, Task4

다른 태스크는 서로 의존하며 순차적으로 실행해야 할 때(Task1, Task2, Task5) 비동기 흐름을 설계하는 강력한 메커니즘을 지원한다.

 

https://www.kodeco.com/books/modern-concurrency-in-swift/v1.0/chapters/7-concurrent-code-with-taskgroup

 

 

  • async let 구문은 여러 비동기 태스크를 병렬로 실행하는데 유용하지만, 이는 고정된 개수의 태스크를 다룰 때 적합하다.
  • 병렬로 실행해야 하는 태스크의 수가 매우 많거나 동적으로 결정되는 경우 TaskGroup을 사용하는 것이 적합하다.
    • 1,000개의 태스크를 병렬로 실행하는 경우 async let을 1,000번 작성하겠는가?
    • 태스크의 개수를 미리 알지 못하면 런타임에 그 결정을 어떻게 처리하겠는가?

[TaskGroup]

  • 동적으로 병렬 코드를 만들어주는 API이다.
  • 데이터 레이스 방지해 준다.
  • 각 작업의 결과를 안전하게 수집하고 처리할 수 있다.

https://www.kodeco.com/books/modern-concurrency-in-swift/v1.0/chapters/7-concurrent-code-with-taskgroup

 

Introducing TaskGroup

[TaskGroup 소개]

 

https://developer.apple.com/documentation/swift/taskgroup

 

TaskGroup | Apple Developer Documentation

A group that contains dynamically created child tasks.

developer.apple.com

 

https://developer.apple.com/documentation/swift/taskgroup

 

 

  • TaskGroup동적으로 생성된 자식 태스크들을 포함하는 그룹이다.
  • TaskGroup을 구성하는 데 사용되는 두 가지 변형 - TaskGroup, ThrowingTaskGroup
    • 여태 살펴본 다른 API들처럼, 에러를 발생시킬 수 있는 태스크를 허용한다는 점 이외에는 거의 동일
  • TaskGroup은 public 생성자를 지원하지 않는다.
    • 대신 컴파일러가 올바르게 타입 체크할 수 있도록 돕는 제네릭 함수를 사용한다.
  • withTaskGroup(of:returning:body:) 
    • 주어진 태스크 반환 타입, 그룹 내 태스크로부터 구성할 최종 결과의 반환 타입, 그룹을 초기화하고 실행하는 코드인 클로저를 인자로 받아 그룹을 생성
  • withThrowingTaskGroup(of:returning:body:)
    • 유사한 매개변수를 사용하지만, 각 태스크 및 전체 그룹이 에러를 발생시킬 수 있음
  • 태스크 그룹가 생성된 태스크의 범위를 벗어난 곳에서 태스크 그룹을 사용하지 마라
    • 자식 태스크를 태스크 그룹에 추가하는 것은 변경 작업(mutation operation)이다.  
    • Swift 타입 시스템은 태스크 그룹이 벗어나지 못하도록 방지한다.
    • 그 이유는, 자식 태스크가 독립적으로 실행되기 때문에 공유 자원을 안전하게 변경할 수 없다.
    • 또한 변경 작업은 자식 태스크와 같은 병렬 실행 컨텍스트에서도 수행할 수 없다.

[생성된 범위 밖에서 태스크 그룹의 mutation operation 시도]

func performTask() async {
    var outerGroup: TaskGroup<Void>?
    
    await withTaskGroup(of: Void.self) { group in
        outerGroup = group
        
        group.addTask {
            print("첫 번째 자식 태스크 시작")
        }
    }
    
    // TaskGroup을 범위 밖에서 사용하려는 시도
    outerGroup?.addTask {
        print("외부에서 태스크 그룹 사용")
    }
    
}

Task {
    await performTask()
}

// 첫 번째 자식 태스크 시작
// libc++abi: Pure virtual function called! 런타임 에러 발생

 

[자식 태스크 내에서 태스크 그룹의 mutation operation 시도]

func performTask() async {
    await withTaskGroup(of: Void.self) { group in
        group.addTask {
            ...
            
            group.addTask { // 자식 작업 내에서 또 다른 작업을 추가하려고 함 (올바르지 않음)
              ...
            }
        }
    }
}

  • 태스크 수행 순서
    • 태스크 그룹에 추가된 태스크들은 병렬로 실행된다.
    • 어떤 순서로 예약될지는 정해져 있지 않다.
  • 취소 동작
    • cancelAll()이 호출되거나 TaskGroup을 실행하는 Task가 취소되었을 때 TaskGroup은 취소된다.
    • TaskGroup이 취소되면 취소가 모든 자식 태스크로 자동으로 전파된다.
    • 취소된 태스크 그룹에 여전히 태스크를 추가할 수 있지만, 추가되는 태스크는 즉시 취소되기 시작한다.
    • 이미 취소된 태스크 그룹에 새로운 태스크를 추가하는 것을 피하기 위해 addTaskUnlessCancelled(priority:body:)를 사용하면 좋다.

중요한 점은 그룹 내 모든 태스크가 완료된 후에만 결과를 반환한다.

 

[책에서 소개하는 간단한 사용 예시]

// 1
let images = try await withThrowingTaskGroup(
  of: Data.self,
  returning: [UIImage].self
) { group in
  // 2
  for index in 0..<numberOfImages {
    let url = baseURL.appendingPathComponent("image\(index).png")
    // 3
    group.addTask {
      // 4
      return try await URLSession.shared
	    .data(from: url, delegate: nil)
    	    .0
    }
  }
  // 5
  return try await group.reduce(into: [UIImage]()) { result, data in
    if let image = UIImage(data: data) {
      result.append(image)
    }
  }
}

 

  1. 각 자식 태스크의 반환 타입을 Data로 설정하며, 그룹 전체는 [UIImage]로 반환한다. 클로저 선언에서 명시적으로 반환 타입을 설정하면 returning 인자를 생략할 수도 있다.
  2. 코드의 다른 곳에서 불러올 이미지 개수를 계산한 후, 이곳에서 반복문을 통해 각 이미지를 불러온다.
  3. group은 바로 사용할 수 있는 ThrowingTaskGroup이다. for 루프 안에서 group.addTask { ... } 를 사용하여 태스크를 그룹에 추가한다.
  4. 태스크는 API에서 데이터를 가져오는 작업을 수행한다.
  5. 태스크 그룹은 AsyncSequence를 준수하므로, 그룹 내의 각 태스크가 완료도리 때마다 결과를 수집하여 이미지 배열에 추가하고 반환한다.

 

위 예제는 가변 개수의 병렬 태스크를 시작하고, 각 태스크는 하나의 이미지를 다운로드한다. 마지막으로, 모든 이미지를 담은 배열을 images에 할당한다.

 

[TaskGroup 주요 API]

  • addTask(priority:operation:)
    • 주어진 우선순위로 그룹에 태스크를 추가하여 병렬로 실행
  • addTaskUnlessCancelled(priority:operation:)
    • 그룹이 이미 취소된 경우 아무 작업도 하지 않음
  • cancelAll()
    • 그룹을 취소. 즉, 현재 실행 중인 모든 태스크와 앞으로 추가될 모든 태스크를 취소
    • 태스크 그룹이 취소된 이후에 새로운 태스크가 추가된다면 그 태스크는 그룹에 추가 즉시 취소된다.
    • 즉시 취소된 자식 태스크에 대해서 CancellationError를 가능한 한 빨리 던지거나 취소를 적절하게 처리해야 한다.
  • isCancelled
    • 그룹이 취소된 경우 true를 반환
    • cancelAll()이 호출되었거나 그룹을 실행시킨 태스크가 취소된 경우
  • isEmpty
    • 그룹이 모든 태스크를 완료했거나 처음부터 태스크가 없을 경우 true를 반환
import Foundation

func performConcurrentTasks() async {
    await withTaskGroup(of: Int.self) { group in
        print("Task group is empty at start: \(group.isEmpty)")  // true

        for i in 0..<5 {
            group.addTask {
                return i * 2
            }
            print("Task group is not empty after addTask: \(group.isEmpty)")  // false
        }

        for await result in group {
            print("Result: \(result)")
            print("Task group is empty ?: \(group.isEmpty)")  // false, 마지막은 result 수행 이후 true
        }
		
        print("Task group is empty at end: \(group.isEmpty)")  // true
    }
}

Task {
    await performConcurrentTasks()
}
  • waitForAll()
    • 모든 태스크가 완료될 때까지 대기. 그룹 작업이 끝난 후 실행해야 할 코드가 있을 때 사용

 

또한 TaskGroupAsyncSequence를 준수하므로, 그룹을 비동기적으로 반복하여 일반적인 Swift 시퀀스처럼 태스크 반환 값을 가져올 수 있다. 병렬로 태스크를 실행하면서 시퀀스로 결과를 반복 처리할 수 있으므로 비병렬 컨텍스트에서 실행된다. 이 덕분에 결과를 반복 처리하여 배열 같은 가변 상태에 안전하게 반영할 수 있다.

 

Getting Result From a TaskGroup

전체 TaskGroup에 대한 결과를 처리하고 싶으면, withTaskGroup의 리턴 값을 활용하면 된다.

  func runAllTasks() async throws {
    ...
    let scans = await withTaskGroup(of: String.self) { [unowned self] group -> [String] in
      for number in 0..<self.total {
        group.addTask {
          await self.worker(number: number)
        }
      }
      
      return await group
        .reduce(into: [String]()) { result, string in
          result.append(string)
        }
    }
    
    print(scans)
  }

Mutating Shared State

[태스크 공유 상태 수정]

  • 일반적으로 TaskGroup에서 병렬 태스크의 결과는 태스크가 완료된 후 그룹을 반복하며 수집된다.
  • 하지만 특정 상황에서는 각 병렬 태스크가 직접 공유 상태(예: 로그 객체)를 업데이트해야 할 때가 있다.
    • 예를 들어 파일 다운로드와 같은 경우, 다운로드 결과를 앱 로거 객체를 통해 즉시 기록할 수 있다. 만약 파일들 중 하나가 다운로드에 실패하여 에러를 발생시키더라도, 나머지 요청들은 파일을 받는 즉시 성공적으로 로그를 남긴다.

[데이터 레이스]

  • 만약 인스턴스 프러퍼티나 최 상위변수와 같은 공유 상태를 병렬로 변경하게 된다면, 데이터 레이스(data race)가 발생할 수 있다.
    • 이는 데이터의 일관성을 깨트리거나 앱의 크래시로 이어지게 된다.
  • 데이터 레이스가 까다로운 점은, Xcode debug 모드로 앱을 실행할 때는 거의 크래시가 발생하지 않는다.
  • 주로 앱을 release 모드로 컴파일하고 실제 기기에서 실행할 때 크래시를 발생시킨다.
  • 따라서 개발자보다 최종 사용자에게 더 자주 발생할 가능성이 크다.

[주의를 기울일 점]

  • 태스크 그룹 코드의 어떤 부분이 동시에 실행되는지에 대해 주의 깊게 살펴봐야 한다.
  • Swift 컴파일러는 비동기 컨텍스테에서 공유 상태를 변경하지 못하도록 점점 더 개선되고 있지만, 개발자가 정확히 어떤 일이 일어나는지 알고 있는 것이 중요하다.
  • TaskGroup 코드는 대략 concurrent, asynchronous, synchronus 실행흐름으로 나눠 생각해 볼 수 있다.

https://www.kodeco.com/books/modern-concurrency-in-swift/v1.0/chapters/7-concurrent-code-with-taskgroup

 

Synchronous

  • 가장 안전하게 공유 상태를 수정할 수 있는 곳이다.
  • 일반적으로 순차적으로 실행되므로, 여러 스레드에서 동시 접근에 문제가 없다.
  • 예를 들어, 태스크 그룹 외부에서 상태를 수정하는 것이 가장 안전하다.

Asynchronous

  • 비동기 코드에서 공유 상태를 수정하는 것은 약간의 위험이 따른다.
  • Swift 컴파일러가 문제가 없다고 판단할 때는 어느 정도 안전하지만, 이 경우에도 상태 수정이 동시 실행될 가능성이 있는지 확실히 알아야 한다. 데이터 레이스가 발생하지 않도록 주의해야 한다.

Concurrent

  • 동시 실행 부분에서 공유 상태를 수행하는 것은 매우 위험하다.
  • 여러 스레드가 동시에 접근할 수 있기 때문에 데이터 레이스가 발생할 가능성이 높다. 이런 상황에서는 안전 메커니즘을 사용해야 한다. 예를 들면 Swift Actor를 사용할 수 있는데 이는 Chapter8에서 자세히 다룬다.

Processing Task Results in Real Time

  • 전체 태스크 그룹의 결과가 아닌 각 태스크의 결과에 즉시 반응하고 싶을 수 있다.
    • 예를 들어 UI를 업데이트하여 진행 상황을 보여주거나, 태스크 결과에 따라 그룹 실행 흐름을 제어하고 싶을 때.
  • 다행히도 TaskGroup은 그룹의 작업량을 동적으로 제어할 수 있도록 지원한다. 필요에 따라 기존 태스크를 취소하거나, 새로운 태스크를 추가하는 등의 작업이 가능하다.
  • group이 AsyncSequence를 준수하기 때문에, for 루프로 반복할 수 있다.
  func runAllTasks() async throws {
    ...
    await withTaskGroup(of: String.self) { [unowned self] group in
      for number in 0..<self.total {
        group.addTask {
          await self.worker(number: number)
        }
      }
      
      for await result in group {
        print("Completed: \(result)") // 태스크 결과에 따라 즉각 처리
      }
      print("Done.")
    }
  }

 

Controlling the Group Flow

지금까지 살펴본 예제는 모든 태스크를 런타임이 몇 개의 태스크를 실행할지, 언제 실행할지를 결정하도록 하고, 그룹의 태스크가 모두 완료될 때까지 진행하였다.

 

하지만 때로는 동시에 수행되는 태스크의 개수를 제한하고 싶을 수 있다.

  func runAllTasks() async throws {
    self.started = Date()
    
    await withTaskGroup(of: String.self) { [unowned self] group in
      let batchSize = 4
      for index in 0..<batchSize {
        group.addTask {
          await self.worker(number: index)
        }
      }
      
      // 1
      var index = batchSize
      
      // 2
      for await result in group {
        print("Completed: \(result)")
        
        // 3
        if index < self.total {
          group.addTask { [index] in
            await self.worker(number: index)
          }
          index += 1
        }
      }
    }
  }

 

그룹이 실행 중일 때 태스크를 추가하면 여러 흥미로운 작업들을 수행할 수 있다.

  • 계속해서 태스크를 추가하여 그룹을 무기한 실행
  • 실패한 태스크를 다시 그룹에 추가하여 재시도
  • 특정 개수의 태스크가 완료되거나, 원하는 결과를 찾으면 높은 우선순위의 UI 태스크 삽입

Group Error Handling

태스크 그룹 내의 태스크가 에러가 발생할 때 이 처리가 되어있지 않다면, 그룹 전체가 중단된다.

이후의 태스크가 실행되지 않을 뿐만 아니라, 이미 완료된 태스크의 결과도 얻을 수 없기 때문에 주의를 기울여야 한다.