# [Dev Diaries] Tasks in parallel

##### Context

Back in the days of the old blog, I had posted a way to manage my asynchronous tasks by grouping them and basically having the kernel of something akin to promises in other languages/frameworks.

It was mostly based on operation queues and locks, and basically handled only the equivalent of "guarantees" in promise parlance.

A few months ago, John Sundell published a task based system that tickled me greatly. I immediately proceeded to forget about it until I got an Urge again to optimize my K-Means implementation. I tweaked his code to use my terminology and avoid rewriting everything where I used the concurrency stuff, as well as added a bunch of things I needed. Without further ado, here is some code and some comments.

##### Core feature: perform on queue

First, an alias that is inherited from the past and facilitates some operations further down the line:

public typealias constantFunction = (() throws -> ())

Then the main meat of the system: the Task class and ancillary utilities. My code was already fairly close from Sundell's, so I mostly adopted his style.

public class Task {
// MARK: Ancillary stuff
case success
case failure(Error)
}

fileprivate let queue : DispatchQueue
fileprivate let handler : (TaskResult) -> Void

func finish() {
handler(.success)
}

func fail(with error: Error) {
handler(.failure(error))
}
}

//MARK: Init

public init( _ closure: @escaping TaskFunction) {
self.closure = closure
}

public convenience init(_ f: @escaping constantFunction) {
self.init { manager in
do {
try f()
manager.finish()
} catch {
manager.fail(with: error)
}
}
}

//MARK: Core
public func perform(on queue: DispatchQueue = .global(),
handler: @escaping (TaskResult) -> Void) {
queue.async {
queue: queue,
handler: handler
)

self.closure(manager)
}
}
}

In order to understand the gist of it, I really recommend reading the article, but in essence, it's "just" something that executes a function, then signals the manager that the task is complete.

I added an initializer that allows me to write my code like this, for backwards compatibility and stylistic reasons:

Task {
print("Hello")
}.perform { result in
switch result {
case .success: // do something
case .failure(let err): // here too, probably
}
}

It's important to note that the block passed to task must take no argument and return nothing. But it doesn't prevent it from doing block stuff:

var nt = 0
nt += 42
}.perform { result in
switch result {
case .success: print(nt)
case .failure(let err): break // not probable
}
}

Outputs: 42

Of course, the block can throw, in which case we'll end up in the .failure case.

##### Sequence stuff

The task sequencing mechanism wasn't of any particular interest to my project, but I decided to treat it the same way I did the parallel one. Sundell's code is perfectly fine, I just wanted operators to have some syntactic sugar:

//MARK: Sequential
// replaces "then"
infix operator •: MultiplicationPrecedence
var index = 0

guard index < tasks.count else {
// We’ve reached the end of our array of tasks,
// time to finish the sequence.
controller.finish()
return
}

index += 1

switch outcome {
case .success:
performNext(using: controller)
case .failure(let error):
// As soon as an error was occurred, we’ll
// fail the entire sequence.
controller.fail(with: error)
}
}
}

}

}
}

The comments say it all: we take the tasks one by one, essentially having a Task(Task(Task(...))) system that handles failure gracefully. I wanted to have a • operator because I like writing code like this:

(Task {
print("Hello")
print("You")
print("Beautiful")
print("Syntax!")
}
).perform { (_) in
print("done")
}

Outputs:

Hello
You
Beautiful
Syntax!
done

Because of the structure of the project I'm using parallelism in, I tend to manipulate [Task] objects a lot, so I added an operator on the array manipulation as well:

// [Task...]••
postfix operator ••
static postfix func ••(_ f: Array<Element>) -> Task {
}
}

This allows me to write code like this:

var tasks = [Task]()
for i in 1..<10 {
print(i)
})
}
// this space for rent
}

Outputs the numbers from 1 to 9 sequentially. It is, admittedly, a fairly useless feature to be able to create tasks in a loop that will execute one after the other, instead of "just" looping in a more regular fashion, but I tend to like symmetry, which leads me to the main meat of the code.

##### Parallelism

Similarly to the sequence way of doing things, Sundell's approach is pitch perfect, and much more efficient than my own, especially in regards to error handling, so I modified my code to follow his recommendations.

Before reading the code, there are two things you should be aware of:

• DispatchGroup allows for aggregate synchronization of work. It's a fairly unknown tool that you should read about
• Sundell's code did not include a mechanism for waiting for the group's completion. I included a DispatchSemaphore that optionally lets me wait for the group to be done ( nil by default, meaning I do not wait for completion with the syntactic sugar)
// MARK: Parallel
// Replaces "enqueue"
let group = DispatchGroup()

// To avoid race conditions with errors, we set up a private
// queue to sync all assignments to our error variable
var anyError: Error?

group.enter()

// It’s important to make the sub-tasks execute
// on the same DispatchQueue as the group, since
// we might cause unexpected threading issues otherwise.
switch outcome {
case .success:
break
case .failure(let error):
errorSyncQueue.sync {
anyError = anyError ?? error
}
}

group.leave()
}
}

group.notify(queue: controller.queue) {
if let error = anyError {
controller.fail(with: error)
} else {
controller.finish()
}
if let semaphore = semaphore {
semaphore.signal()
}
}
}
}

}
}

Just like with the sequential code, it allows me to write:

(Task {
print("Hello")
print("You")
print("Beautiful")
print("Syntax!")
}
).perform { (_) in
print("done")
}

Note that even though the tasks are marked as being parallel, because of the way operators work, you end up grouping the tasks for parallel execution two by two, which is fairly useless in general. The above code outputs (sometimes):

Syntax!
Beautiful
Hello
You
done

This highlights the point I was making: the first "pair" to be executed is the 3 first tasks together, in parallel with the last one. Since the latter finishes early in comparison to a group of tasks, it is output first. But I included this operator for symmetry (and because I can).

Much more interestingly, grouping tasks in an array performs them all in parallel, and is the only way to have them work in a way that resembles the instinct you probably have regarding parallel tasks:

// [Task...]||
postfix operator ||
static postfix func ||(_ f: Array<Element>) -> Task {
}
}

This allows to write:

var tasks = [Task]()
for i in 1..<10 {
// for simulation purposes
usleep(UInt32.random(in: 100...500))
print(i)
})
}
// this space for rent
}

This outputs the following text after the longest task is done:

2
5
1
8
3
4
6
7
9

I'd like to include a ternary operator to wait for the group to be finished, but it's not currently possible in swift (in the same way a n-ary operator is currently impossible). This means a fairly sad syntax:

infix operator ~~
static func ~~(_ f: Array<Element>, _ s: DispatchSemaphore) -> Task {
return g
}
}

The following test code works:

var tasks = [Task]()
let s = DispatchSemaphore(value: 0)
for i in 1..<10 {
usleep(UInt32.random(in: 100...5500))
print(i)
})
}
// this space for rent
}
s.wait()

Sadly, we now need to parenthesize tasks~~s, which is why I'm bothered. But at least my code can be synchronous or asynchronous, as needed.

##### One last thing

Because I played a lot with syntactic stuff and my algorithms, I decided to make a sort of meta function that handles a lot of things in one go:

• it allows me to collect the output of the functions in an array
• it works like a group
• it is optionally synchronous
//MARK: Syntactic sugar
static func parallel(handler: @escaping (([Any], Error?)->()), wait: Bool = false, functions: (() throws -> Any)...) {
var result = [Any]()
let lock = NSLock()
for f in functions {
let r = try f()
lock.lock()
result.append(r)
lock.unlock()
}
group.append(t)
}
if !wait {
group||.perform { (local) in
switch local {
case .success:
handler(result, nil)
case .failure(let e):
handler(result,e)
}
}
} else {
let sem = DispatchSemaphore(value: 0)
Task.group(group, semaphore: sem).perform { (local) in
switch local {
case .success:
handler(result, nil)
case .failure(let e):
handler(result,e)
}
}
sem.wait()
}
}
}

And it can be used like this:

var n = 0
print(result)
print(error?.localizedDescription ?? "no error")
}, functions: {
throw TE()
}, {
n += 1
return n
}, {
n += 1
return n
}, {
n += 1
return n
}, {
n += 1
return n
},...
)

And it will output something like:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 45, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 34, 46]
The operation couldn’t be completed.

Of course, you can wait for all the tasks to be complete by using the wait parameter:

Task.parallel(handler: { (result, error) in
print(result)
print(error)
}, wait: true, functions: {
throw TE()
}, {
n += 1
return n
},...
)
##### Conclusion

Thanks to John Sundell's excellent write-up, I refactored my code and made it more efficient and fairly less convoluted than it was before.

I also abstained from using OperationQueue, which has some quirks on Linux, whereas this implementation works just fine.