이전글 - Concurrency(6) Testing Asynchronous Code
Modern Concurrency in Swift를 읽고 간단 정리
현재 챕터까지 진행하면서 아래 내용을 학습하였다.
- async/await를 사용한 코드 설계
- Asynchronous Sequence 생성
- async let 바인딩을 통한 Task 병렬 실행
특히 asnyc let 바인딩은 일부 태스크를 병렬로 실행하면서 (Task3, Task4)
다른 태스크는 서로 의존하며 순차적으로 실행해야 할 때(Task1, Task2, Task5) 비동기 흐름을 설계하는 강력한 메커니즘을 지원한다.
- async let 구문은 여러 비동기 태스크를 병렬로 실행하는데 유용하지만, 이는 고정된 개수의 태스크를 다룰 때 적합하다.
- 병렬로 실행해야 하는 태스크의 수가 매우 많거나 동적으로 결정되는 경우 TaskGroup을 사용하는 것이 적합하다.
- 1,000개의 태스크를 병렬로 실행하는 경우 async let을 1,000번 작성하겠는가?
- 태스크의 개수를 미리 알지 못하면 런타임에 그 결정을 어떻게 처리하겠는가?
[TaskGroup]
- 동적으로 병렬 코드를 만들어주는 API이다.
- 데이터 레이스 방지해 준다.
- 각 작업의 결과를 안전하게 수집하고 처리할 수 있다.
Introducing TaskGroup
[TaskGroup 소개]
https://developer.apple.com/documentation/swift/taskgroup
- TaskGroup은 동적으로 생성된 자식 태스크들을 포함하는 그룹이다.
- TaskGroup을 구성하는 데 사용되는 두 가지 변형 - TaskGroup, ThrowingTaskGroup
- 여태 살펴본 다른 API들처럼, 에러를 발생시킬 수 있는 태스크를 허용한다는 점 이외에는 거의 동일
- TaskGroup은 public 생성자를 지원하지 않는다.
- 대신 컴파일러가 올바르게 타입 체크할 수 있도록 돕는 제네릭 함수를 사용한다.
- withTaskGroup(of:returning:body:)
- 주어진 태스크 반환 타입, 그룹 내 태스크로부터 구성할 최종 결과의 반환 타입, 그룹을 초기화하고 실행하는 코드인 클로저를 인자로 받아 그룹을 생성
- withThrowingTaskGroup(of:returning:body:)
- 유사한 매개변수를 사용하지만, 각 태스크 및 전체 그룹이 에러를 발생시킬 수 있음
- 태스크 그룹가 생성된 태스크의 범위를 벗어난 곳에서 태스크 그룹을 사용하지 마라
- 자식 태스크를 태스크 그룹에 추가하는 것은 변경 작업(mutation operation)이다.
- Swift 타입 시스템은 태스크 그룹이 벗어나지 못하도록 방지한다.
- 그 이유는, 자식 태스크가 독립적으로 실행되기 때문에 공유 자원을 안전하게 변경할 수 없다.
- 또한 변경 작업은 자식 태스크와 같은 병렬 실행 컨텍스트에서도 수행할 수 없다.
[생성된 범위 밖에서 태스크 그룹의 mutation operation 시도]
func performTask() async {
var outerGroup: TaskGroup<Void>?
await withTaskGroup(of: Void.self) { group in
outerGroup = group
group.addTask {
print("첫 번째 자식 태스크 시작")
}
}
// TaskGroup을 범위 밖에서 사용하려는 시도
outerGroup?.addTask {
print("외부에서 태스크 그룹 사용")
}
}
Task {
await performTask()
}
// 첫 번째 자식 태스크 시작
// libc++abi: Pure virtual function called! 런타임 에러 발생
[자식 태스크 내에서 태스크 그룹의 mutation operation 시도]
func performTask() async {
await withTaskGroup(of: Void.self) { group in
group.addTask {
...
group.addTask { // 자식 작업 내에서 또 다른 작업을 추가하려고 함 (올바르지 않음)
...
}
}
}
}
- 태스크 수행 순서
- 태스크 그룹에 추가된 태스크들은 병렬로 실행된다.
- 어떤 순서로 예약될지는 정해져 있지 않다.
- 취소 동작
- cancelAll()이 호출되거나 TaskGroup을 실행하는 Task가 취소되었을 때 TaskGroup은 취소된다.
- TaskGroup이 취소되면 취소가 모든 자식 태스크로 자동으로 전파된다.
- 취소된 태스크 그룹에 여전히 태스크를 추가할 수 있지만, 추가되는 태스크는 즉시 취소되기 시작한다.
- 이미 취소된 태스크 그룹에 새로운 태스크를 추가하는 것을 피하기 위해 addTaskUnlessCancelled(priority:body:)를 사용하면 좋다.
중요한 점은 그룹 내 모든 태스크가 완료된 후에만 결과를 반환한다.
[책에서 소개하는 간단한 사용 예시]
// 1
let images = try await withThrowingTaskGroup(
of: Data.self,
returning: [UIImage].self
) { group in
// 2
for index in 0..<numberOfImages {
let url = baseURL.appendingPathComponent("image\(index).png")
// 3
group.addTask {
// 4
return try await URLSession.shared
.data(from: url, delegate: nil)
.0
}
}
// 5
return try await group.reduce(into: [UIImage]()) { result, data in
if let image = UIImage(data: data) {
result.append(image)
}
}
}
- 각 자식 태스크의 반환 타입을 Data로 설정하며, 그룹 전체는 [UIImage]로 반환한다. 클로저 선언에서 명시적으로 반환 타입을 설정하면 returning 인자를 생략할 수도 있다.
- 코드의 다른 곳에서 불러올 이미지 개수를 계산한 후, 이곳에서 반복문을 통해 각 이미지를 불러온다.
- group은 바로 사용할 수 있는 ThrowingTaskGroup이다. for 루프 안에서 group.addTask { ... } 를 사용하여 태스크를 그룹에 추가한다.
- 태스크는 API에서 데이터를 가져오는 작업을 수행한다.
- 태스크 그룹은 AsyncSequence를 준수하므로, 그룹 내의 각 태스크가 완료도리 때마다 결과를 수집하여 이미지 배열에 추가하고 반환한다.
위 예제는 가변 개수의 병렬 태스크를 시작하고, 각 태스크는 하나의 이미지를 다운로드한다. 마지막으로, 모든 이미지를 담은 배열을 images에 할당한다.
[TaskGroup 주요 API]
- addTask(priority:operation:)
- 주어진 우선순위로 그룹에 태스크를 추가하여 병렬로 실행
- addTaskUnlessCancelled(priority:operation:)
- 그룹이 이미 취소된 경우 아무 작업도 하지 않음
- cancelAll()
- 그룹을 취소. 즉, 현재 실행 중인 모든 태스크와 앞으로 추가될 모든 태스크를 취소
- 태스크 그룹이 취소된 이후에 새로운 태스크가 추가된다면 그 태스크는 그룹에 추가 즉시 취소된다.
- 즉시 취소된 자식 태스크에 대해서 CancellationError를 가능한 한 빨리 던지거나 취소를 적절하게 처리해야 한다.
- isCancelled
- 그룹이 취소된 경우 true를 반환
- cancelAll()이 호출되었거나 그룹을 실행시킨 태스크가 취소된 경우
- isEmpty
- 그룹이 모든 태스크를 완료했거나 처음부터 태스크가 없을 경우 true를 반환
import Foundation
func performConcurrentTasks() async {
await withTaskGroup(of: Int.self) { group in
print("Task group is empty at start: \(group.isEmpty)") // true
for i in 0..<5 {
group.addTask {
return i * 2
}
print("Task group is not empty after addTask: \(group.isEmpty)") // false
}
for await result in group {
print("Result: \(result)")
print("Task group is empty ?: \(group.isEmpty)") // false, 마지막은 result 수행 이후 true
}
print("Task group is empty at end: \(group.isEmpty)") // true
}
}
Task {
await performConcurrentTasks()
}
- waitForAll()
- 모든 태스크가 완료될 때까지 대기. 그룹 작업이 끝난 후 실행해야 할 코드가 있을 때 사용
또한 TaskGroup은 AsyncSequence를 준수하므로, 그룹을 비동기적으로 반복하여 일반적인 Swift 시퀀스처럼 태스크 반환 값을 가져올 수 있다. 병렬로 태스크를 실행하면서 시퀀스로 결과를 반복 처리할 수 있으므로 비병렬 컨텍스트에서 실행된다. 이 덕분에 결과를 반복 처리하여 배열 같은 가변 상태에 안전하게 반영할 수 있다.
Getting Result From a TaskGroup
전체 TaskGroup에 대한 결과를 처리하고 싶으면, withTaskGroup의 리턴 값을 활용하면 된다.
func runAllTasks() async throws {
...
let scans = await withTaskGroup(of: String.self) { [unowned self] group -> [String] in
for number in 0..<self.total {
group.addTask {
await self.worker(number: number)
}
}
return await group
.reduce(into: [String]()) { result, string in
result.append(string)
}
}
print(scans)
}
Mutating Shared State
[태스크 공유 상태 수정]
- 일반적으로 TaskGroup에서 병렬 태스크의 결과는 태스크가 완료된 후 그룹을 반복하며 수집된다.
- 하지만 특정 상황에서는 각 병렬 태스크가 직접 공유 상태(예: 로그 객체)를 업데이트해야 할 때가 있다.
- 예를 들어 파일 다운로드와 같은 경우, 다운로드 결과를 앱 로거 객체를 통해 즉시 기록할 수 있다. 만약 파일들 중 하나가 다운로드에 실패하여 에러를 발생시키더라도, 나머지 요청들은 파일을 받는 즉시 성공적으로 로그를 남긴다.
[데이터 레이스]
- 만약 인스턴스 프러퍼티나 최 상위변수와 같은 공유 상태를 병렬로 변경하게 된다면, 데이터 레이스(data race)가 발생할 수 있다.
- 이는 데이터의 일관성을 깨트리거나 앱의 크래시로 이어지게 된다.
- 데이터 레이스가 까다로운 점은, Xcode debug 모드로 앱을 실행할 때는 거의 크래시가 발생하지 않는다.
- 주로 앱을 release 모드로 컴파일하고 실제 기기에서 실행할 때 크래시를 발생시킨다.
- 따라서 개발자보다 최종 사용자에게 더 자주 발생할 가능성이 크다.
[주의를 기울일 점]
- 태스크 그룹 코드의 어떤 부분이 동시에 실행되는지에 대해 주의 깊게 살펴봐야 한다.
- Swift 컴파일러는 비동기 컨텍스테에서 공유 상태를 변경하지 못하도록 점점 더 개선되고 있지만, 개발자가 정확히 어떤 일이 일어나는지 알고 있는 것이 중요하다.
- TaskGroup 코드는 대략 concurrent, asynchronous, synchronus 실행흐름으로 나눠 생각해 볼 수 있다.
Synchronous
- 가장 안전하게 공유 상태를 수정할 수 있는 곳이다.
- 일반적으로 순차적으로 실행되므로, 여러 스레드에서 동시 접근에 문제가 없다.
- 예를 들어, 태스크 그룹 외부에서 상태를 수정하는 것이 가장 안전하다.
Asynchronous
- 비동기 코드에서 공유 상태를 수정하는 것은 약간의 위험이 따른다.
- Swift 컴파일러가 문제가 없다고 판단할 때는 어느 정도 안전하지만, 이 경우에도 상태 수정이 동시 실행될 가능성이 있는지 확실히 알아야 한다. 데이터 레이스가 발생하지 않도록 주의해야 한다.
Concurrent
- 동시 실행 부분에서 공유 상태를 수행하는 것은 매우 위험하다.
- 여러 스레드가 동시에 접근할 수 있기 때문에 데이터 레이스가 발생할 가능성이 높다. 이런 상황에서는 안전 메커니즘을 사용해야 한다. 예를 들면 Swift Actor를 사용할 수 있는데 이는 Chapter8에서 자세히 다룬다.
Processing Task Results in Real Time
- 전체 태스크 그룹의 결과가 아닌 각 태스크의 결과에 즉시 반응하고 싶을 수 있다.
- 예를 들어 UI를 업데이트하여 진행 상황을 보여주거나, 태스크 결과에 따라 그룹 실행 흐름을 제어하고 싶을 때.
- 다행히도 TaskGroup은 그룹의 작업량을 동적으로 제어할 수 있도록 지원한다. 필요에 따라 기존 태스크를 취소하거나, 새로운 태스크를 추가하는 등의 작업이 가능하다.
- group이 AsyncSequence를 준수하기 때문에, for 루프로 반복할 수 있다.
func runAllTasks() async throws {
...
await withTaskGroup(of: String.self) { [unowned self] group in
for number in 0..<self.total {
group.addTask {
await self.worker(number: number)
}
}
for await result in group {
print("Completed: \(result)") // 태스크 결과에 따라 즉각 처리
}
print("Done.")
}
}
Controlling the Group Flow
지금까지 살펴본 예제는 모든 태스크를 런타임이 몇 개의 태스크를 실행할지, 언제 실행할지를 결정하도록 하고, 그룹의 태스크가 모두 완료될 때까지 진행하였다.
하지만 때로는 동시에 수행되는 태스크의 개수를 제한하고 싶을 수 있다.
func runAllTasks() async throws {
self.started = Date()
await withTaskGroup(of: String.self) { [unowned self] group in
let batchSize = 4
for index in 0..<batchSize {
group.addTask {
await self.worker(number: index)
}
}
// 1
var index = batchSize
// 2
for await result in group {
print("Completed: \(result)")
// 3
if index < self.total {
group.addTask { [index] in
await self.worker(number: index)
}
index += 1
}
}
}
}
그룹이 실행 중일 때 태스크를 추가하면 여러 흥미로운 작업들을 수행할 수 있다.
- 계속해서 태스크를 추가하여 그룹을 무기한 실행
- 실패한 태스크를 다시 그룹에 추가하여 재시도
- 특정 개수의 태스크가 완료되거나, 원하는 결과를 찾으면 높은 우선순위의 UI 태스크 삽입
Group Error Handling
태스크 그룹 내의 태스크가 에러가 발생할 때 이 처리가 되어있지 않다면, 그룹 전체가 중단된다.
이후의 태스크가 실행되지 않을 뿐만 아니라, 이미 완료된 태스크의 결과도 얻을 수 없기 때문에 주의를 기울여야 한다.
'iOS > Concurrency' 카테고리의 다른 글
Concurrency (9) Global Actors (2) | 2024.10.13 |
---|---|
Concurrency (8) Getting Started With Actors (0) | 2024.09.29 |
Concurrency (6) Testing Asynchronous Code (1) | 2024.09.08 |
Concurrency (5) Intermediate async/await & Checked Continuation (0) | 2024.09.01 |
Concurrency (4) Custom Asynchronous Sequences With AsyncStream (0) | 2024.08.24 |