[Dev Diaries] Tasks in parallel

Context

Back in the days of the old blog, I had posted a way to manage my asynchronous tasks by grouping them and basically having the kernel of something akin to promises in other languages/frameworks.

It was mostly based on operation queues and locks, and basically handled only the equivalent of "guarantees" in promise parlance.

A few months ago, John Sundell published a task based system that tickled me greatly. I immediately proceeded to forget about it until I got an Urge again to optimize my K-Means implementation. I tweaked his code to use my terminology and avoid rewriting everything where I used the concurrency stuff, as well as added a bunch of things I needed. Without further ado, here is some code and some comments.

Core feature: perform on queue

First, an alias that is inherited from the past and facilitates some operations further down the line:

public typealias constantFunction = (() throws -> ())

Then the main meat of the system: the Task class and ancillary utilities. My code was already fairly close from Sundell's, so I mostly adopted his style.

public class Task {
    // MARK: Ancillary stuff
    public enum TaskResult {
        case success
        case failure(Error)
    }
    
    public struct TaskManager {
        fileprivate let queue : DispatchQueue
        fileprivate let handler : (TaskResult) -> Void
        
        func finish() {
            handler(.success)
        }
        
        func fail(with error: Error) {
            handler(.failure(error))
        }
    }
    public typealias TaskFunction = (TaskManager) -> Void
    
    //MARK: Init
    private let closure: TaskFunction
    
    public init( _ closure: @escaping TaskFunction) {
        self.closure = closure
    }
    
    public convenience init(_ f: @escaping constantFunction) {
        self.init { manager in
            do {
                try f()
                manager.finish()
            } catch {
                manager.fail(with: error)
            }
        }
    }
    
    //MARK: Core
    public func perform(on queue: DispatchQueue = .global(),
                 handler: @escaping (TaskResult) -> Void) {
        queue.async {
            let manager = TaskManager(
                queue: queue,
                handler: handler
            )
            
            self.closure(manager)
        }
    }
}

In order to understand the gist of it, I really recommend reading the article, but in essence, it's "just" something that executes a function, then signals the manager that the task is complete.

I added an initializer that allows me to write my code like this, for backwards compatibility and stylistic reasons:

Task {
    print("Hello")
}.perform { result in
    switch result {
        case .success: // do something
        case .failure(let err): // here too, probably
    }
}

It's important to note that the block passed to task must take no argument and return nothing. But it doesn't prevent it from doing block stuff:

var nt = 0
Task {
    nt += 42
    }.perform { result in
        switch result {
        case .success: print(nt)
        case .failure(let err): break // not probable
        }
}

Outputs: 42

Of course, the block can throw, in which case we'll end up in the .failure case.

Sequence stuff

The task sequencing mechanism wasn't of any particular interest to my project, but I decided to treat it the same way I did the parallel one. Sundell's code is perfectly fine, I just wanted operators to have some syntactic sugar:

//MARK: Sequential
// FROM: https://www.swiftbysundell.com/posts/task-based-concurrency-in-swift
// replaces "then"
infix operator •: MultiplicationPrecedence
extension Task {
    static func sequence(_ tasks: [Task]) -> Task {
        var index = 0
        
        func performNext(using controller: TaskManager) {
            guard index < tasks.count else {
                // We’ve reached the end of our array of tasks,
                // time to finish the sequence.
                controller.finish()
                return
            }
            
            let task = tasks[index]
            index += 1
            
            task.perform(on: controller.queue) { outcome in
                switch outcome {
                case .success:
                    performNext(using: controller)
                case .failure(let error):
                    // As soon as an error was occurred, we’ll
                    // fail the entire sequence.
                    controller.fail(with: error)
                }
            }
        }
        
        return Task(performNext)
    }
    
    // Task • Task
    static func •(_ t1:  Task, _ t2 : Task ) -> Task {
        return Task.sequence([t1,t2])
    }
}

The comments say it all: we take the tasks one by one, essentially having a Task(Task(Task(...))) system that handles failure gracefully. I wanted to have a operator because I like writing code like this:

(Task {
    print("Hello")
    } • Task {
        print("You")
    } • Task {
        print("Beautiful")
    } • Task {
        print("Syntax!")
    }
).perform { (_) in
        print("done")
}

Outputs:

Hello
You
Beautiful
Syntax!
done

Because of the structure of the project I'm using parallelism in, I tend to manipulate [Task] objects a lot, so I added an operator on the array manipulation as well:

// [Task...]••
postfix operator ••
extension Array where Element:Task {
    var sequenceTask : Task { return Task.sequence(self) }
    static postfix func ••(_ f: Array<Element>) -> Task {
        return f.sequenceTask
    }
}

This allows me to write code like this:

var tasks = [Task]()
for i in 1..<10 {
    tasks.append(Task {
        print(i)
    })
}
tasks••.perform { _ in
    // this space for rent
}

Outputs the numbers from 1 to 9 sequentially. It is, admittedly, a fairly useless feature to be able to create tasks in a loop that will execute one after the other, instead of "just" looping in a more regular fashion, but I tend to like symmetry, which leads me to the main meat of the code.

Parallelism

Similarly to the sequence way of doing things, Sundell's approach is pitch perfect, and much more efficient than my own, especially in regards to error handling, so I modified my code to follow his recommendations.

Before reading the code, there are two things you should be aware of:

  • DispatchGroup allows for aggregate synchronization of work. It's a fairly unknown tool that you should read about
  • Sundell's code did not include a mechanism for waiting for the group's completion. I included a DispatchSemaphore that optionally lets me wait for the group to be done ( nil by default, meaning I do not wait for completion with the syntactic sugar)
// MARK: Parallel
infix operator |: AdditionPrecedence
extension Task {
    // Replaces "enqueue"
    static func group(_ tasks: [Task], semaphore: DispatchSemaphore? = nil) -> Task {
        return Task { controller in
            let group = DispatchGroup()
            
            // From: https://www.swiftbysundell.com/posts/task-based-concurrency-in-swift
            // To avoid race conditions with errors, we set up a private
            // queue to sync all assignments to our error variable
            let errorSyncQueue = DispatchQueue(label: "Task.ErrorSync")
            var anyError: Error?
            
            for task in tasks {
                group.enter()
                
                // It’s important to make the sub-tasks execute
                // on the same DispatchQueue as the group, since
                // we might cause unexpected threading issues otherwise.
                task.perform(on: controller.queue) { outcome in
                    switch outcome {
                    case .success:
                        break
                    case .failure(let error):
                        errorSyncQueue.sync {
                            anyError = anyError ?? error
                        }
                    }
                    
                    group.leave()
                }
            }
            
            group.notify(queue: controller.queue) {
                if let error = anyError {
                    controller.fail(with: error)
                } else {
                    controller.finish()
                }
                if let semaphore = semaphore {
                    semaphore.signal()
                }
            }
        }
    }
    
    // Task | Task
    static func |(_ t1:  Task, _ t2 : Task ) -> Task {
        return Task.group([t1,t2])
    }
}

Just like with the sequential code, it allows me to write:

(Task {
    print("Hello")
    } | Task {
        print("You")
    } | Task {
        print("Beautiful")
    } | Task {
        print("Syntax!")
    }
).perform { (_) in
        print("done")
}

Note that even though the tasks are marked as being parallel, because of the way operators work, you end up grouping the tasks for parallel execution two by two, which is fairly useless in general. The above code outputs (sometimes):

Syntax!
Beautiful
Hello
You
done

This highlights the point I was making: the first "pair" to be executed is the 3 first tasks together, in parallel with the last one. Since the latter finishes early in comparison to a group of tasks, it is output first. But I included this operator for symmetry (and because I can).

Much more interestingly, grouping tasks in an array performs them all in parallel, and is the only way to have them work in a way that resembles the instinct you probably have regarding parallel tasks:

// [Task...]||
postfix operator ||
extension Array where Element:Task {
    var groupTask : Task { return Task.group(self) }
    static postfix func ||(_ f: Array<Element>) -> Task {
        return f.groupTask
    }
}

This allows to write:

var tasks = [Task]()
for i in 1..<10 {
    tasks.append(Task {
        // for simulation purposes
        usleep(UInt32.random(in: 100...500))
        print(i)
    })
}
tasks||.perform { _ in
    // this space for rent
}

This outputs the following text after the longest task is done:

2
5
1
8
3
4
6
7
9

I'd like to include a ternary operator to wait for the group to be finished, but it's not currently possible in swift (in the same way a n-ary operator is currently impossible). This means a fairly sad syntax:

infix operator ~~
extension Array where Element:Task {
    static func ~~(_ f: Array<Element>, _ s: DispatchSemaphore) -> Task {
        let g = Task.group(f,semaphore: s)
        return g
    }
}

The following test code works:

var tasks = [Task]()
let s = DispatchSemaphore(value: 0)
for i in 1..<10 {
    tasks.append(Task {
        usleep(UInt32.random(in: 100...5500))
        print(i)
    })
}
(tasks~~s).perform { _ in
    // this space for rent
}
s.wait()

Sadly, we now need to parenthesize tasks~~s, which is why I'm bothered. But at least my code can be synchronous or asynchronous, as needed.

One last thing

Because I played a lot with syntactic stuff and my algorithms, I decided to make a sort of meta function that handles a lot of things in one go:

  • it allows me to collect the output of the functions in an array
  • it works like a group
  • it is optionally synchronous
//MARK: Syntactic sugar
extension Task {
    static func parallel(handler: @escaping (([Any], Error?)->()), wait: Bool = false, functions: (() throws -> Any)...) {
        var group = [Task]()
        var result = [Any]()
        let lock = NSLock()
        for f in functions {
            let t = Task {
                let r = try f()
                lock.lock()
                result.append(r)
                lock.unlock()
            }
            group.append(t)
        }
        if !wait {
            group||.perform { (local) in
                switch local {
                case .success:
                    handler(result, nil)
                case .failure(let e):
                    handler(result,e)
                }
            }
        } else {
            let sem = DispatchSemaphore(value: 0)
            Task.group(group, semaphore: sem).perform { (local) in
                switch local {
                case .success:
                    handler(result, nil)
                case .failure(let e):
                    handler(result,e)
                }
            }
            sem.wait()
        }
    }
}

And it can be used like this:

var n = 0
Task.parallel(handler: { (result, error) in
    print(result)
    print(error?.localizedDescription ?? "no error")
}, functions: {
    throw TE()
}, {
    n += 1
    return n
}, {
    n += 1
    return n
}, {
    n += 1
    return n
}, {
    n += 1
    return n
},...
)

And it will output something like:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 45, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 34, 46]
The operation couldn’t be completed.

Of course, you can wait for all the tasks to be complete by using the wait parameter:

Task.parallel(handler: { (result, error) in
    print(result)
    print(error)
}, wait: true, functions: {
    throw TE()
}, {
    n += 1
    return n
},...
)
Conclusion

Thanks to John Sundell's excellent write-up, I refactored my code and made it more efficient and fairly less convoluted than it was before.

I also abstained from using OperationQueue, which has some quirks on Linux, whereas this implementation works just fine.