仓颉-并发编程

并发概述

并发编程是现代编程语言中不可或缺的特性,仓颉编程语言提供抢占式的线程模型作为并发编程机制。线程可以细分为两种不同概念,语言线程native 线程

  • 语言线程是编程语言中并发模型的基本执行单位。仓颉编程语言希望给开发者提供一个友好、高效、统一的并发编程界面,让开发者无需关心操作系统线程、用户态线程等差异,因此提供仓颉线程的概念。开发者在大多数情况下只需面向仓颉线程编写并发代码。
  • native 线程指语言实现中所使用到的线程(一般是操作系统线程),它们作为语言线程的具体实现载体。不同编程语言会以不同的方式实现语言线程。例如,一些编程语言直接通过操作系统调用来创建线程,这意味着每个语言线程对应一个 native 线程,这种实现方案一般被称之为 1:1 线程模型。此外,另有一些编程语言提供特殊的线程实现,它们允许多个语言线程在多个 native 线程上切换执行,这种也被称为 M:N 线程模型,即 M 个语言线程在 N 个 native 线程上调度执行,其中 M 和 N 不一定相等。当前,仓颉语言的实现同样采用 M:N 线程模型;因此,仓颉线程本质上是一种用户态的轻量级线程,支持抢占且相比操作系统线程更轻量化。

仓颉线程本质上是用户态的轻量级线程,每个仓颉线程都受到底层 native 线程的调度执行,并且多个仓颉线程可以由一个 native 线程执行。每个 native 线程会不断地选择一个就绪的仓颉线程完成执行,如果仓颉线程在执行过程中发生阻塞(例如等待互斥锁的释放),那么 native 线程会将当前的仓颉线程挂起,并继续选择下一个就绪的仓颉线程。发生阻塞的仓颉线程在重新就绪后会继续被 native 线程调度执行。

在大多数情况下,开发者只需要面向仓颉线程进行并发编程而不需要考虑这些细节。但在进行跨语言编程时,开发者需要谨慎调用可能发生阻塞的 foreign 函数,例如 IO 相关的操作系统调用等。例如,下列示例代码中的新线程会调用 foreign 函数 socket_read。在程序运行过程中,某一 native 线程将调度并执行该仓颉线程,在进入到 foreign 函数中后,系统调用会直接阻塞当前 native 线程直到函数执行完成。native 线程在阻塞期间将无法调度其他仓颉线程来执行,这会降低程序执行的吞吐量。

1
2
3
4
5
6
foreign socket_read(sock: Int64): CPointer<Int8>

let fut = spawn {
let sock: Int64 = ...
let ptr = socket_read(sock)
}

注意:

本文档在没有歧义的情况下将直接以线程简化对仓颉线程的指代。

创建线程

当开发者希望并发执行某一段代码时,只需创建一个仓颉线程即可。要创建一个新的仓颉线程,可以使用关键字 spawn 并传递一个无形参的 lambda 表达式,该 lambda 表达式即为在新线程中执行的代码。

下方示例代码中,主线程和新线程均会尝试打印一些文本:

1
2
3
4
5
6
7
8
9
10
11
main(): Int64 {
spawn { =>
println("New thread before sleeping")
sleep(100 * Duration.millisecond) // sleep for 100ms.
println("New thread after sleeping")
}

println("Main thread")

return 0
}

在上面的例子中,新线程会在主线程结束时一起停止,无论这个新线程是否已完成运行。上方示例的输出每次可能略有不同,有可能会输出类似如下的内容:

1
2
New thread before sleeping
Main thread

sleep() 函数会让当前线程睡眠指定的时长,之后再恢复执行,其时间由指定的 Duration 类型决定,详细介绍请参见线程睡眠指定时长章节。

访问线程

使用 Future 等待线程结束并获取返回值

在上面的例子中,新创建的线程会由于主线程结束而提前结束,在缺乏顺序保证的情况下,甚至可能会出现新创建的线程还来不及得到执行就退出了。可以通过 spawn 表达式的返回值,来等待线程执行结束。

spawn 表达式的返回类型是 Future<T>,其中 T 是类型变元,其类型与 lambda 表达式的返回类型一致。当调用 Future<T>get() 成员函数时,它将等待它的线程执行完成

Future<T> 的原型声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Future<T> {
// Blocking the current thread, waiting for the result of the thread corresponding to the current Future object.
// If an exception occurs in the corresponding thread, the method will throw the exception.
public func get(): T

// Blocking the current thread, waiting for the result of the thread corresponding to the current Future object.
// If the corresponding thread has not completed execution within Duration, the method will throws TimeoutException.
// If `timeout` <= Duration.Zero, its behavior is the same as `get()`.
public func get(timeout: Duration): T

// Non-blocking method that immediately returns Option<T>.None if thread has not finished execution.
// Returns the computed result otherwise.
// If an exception occurs in the corresponding thread, the method will throw the exception.
public func tryGet(): Option<T>
}

下方示例代码演示了如何使用 Future<T>main 中等待新创建的线程执行完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import std.sync.*
import std.time.*

main(): Int64 {
let fut: Future<Unit> = spawn { =>
println("New thread before sleeping")
sleep(100 * Duration.millisecond) // sleep for 100ms.
println("New thread after sleeping")
}

println("Main thread")

fut.get() // wait for the thread to finish.
return 0
}

调用 Future<T> 实例的 get() 会阻塞当前运行的线程,直到 Future<T> 实例所代表的线程运行结束。因此,上方示例有可能会输出类似如下内容:

1
2
3
New thread before sleeping
Main thread
New thread after sleeping

主线程在完成打印后会因为调用 get() 而等待新创建的线程执行结束。但主线程和新线程的打印顺序具有不确定性。

如果将 fut.get() 移动到主线程的打印之前,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import std.sync.*
import std.time.*

main(): Int64 {
let fut: Future<Unit> = spawn { =>
println("New thread before sleeping")
sleep(100 * Duration.millisecond) // sleep for 100ms.
println("New thread after sleeping")
}

fut.get() // wait for the thread to finish.

println("Main thread")
return 0
}

主线程将等待新创建的线程执行完成,然后再执行打印,因此程序的输出将变得确定,如下所示:

1
2
3
New thread before sleeping
New thread after sleeping
Main thread

可见,get() 的调用位置会影响线程是否能同时运行。

Future<T> 除了可以用于阻塞等待线程执行结束以外,还可以获取线程执行的结果。如下是它提供的具体成员函数:

  • get(): T:阻塞等待线程执行结束,并返回执行结果,如果该线程已经结束,则直接返回执行结果。

    示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import std.sync.*
    import std.time.*

    main(): Int64 {
    let fut: Future<Int64> = spawn {
    sleep(Duration.second) // sleep for 1s.
    return 1
    }

    try {
    // wait for the thread to finish, and get the result.
    let res: Int64 = fut.get()
    println("result = ${res}")
    } catch (_) {
    println("oops")
    }
    return 0
    }

    输出结果如下:

    1
    result = 1
  • get(timeout: Duration): T:阻塞等待该 Future<T> 所代表的线程执行结束,并返回执行结果,当到达超时时间 timeout 时,如果该线程还没有执行结束,将会抛出异常 TimeoutException。如果 timeout <= Duration.Zero, 其行为与 get() 相同。

    示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    main(): Int64 {
    let fut = spawn {
    sleep(Duration.second) // sleep for 1s.
    return 1
    }

    // wait for the thread to finish, but only for 1 毫秒
    try {
    let res = fut.get(Duration.millisecond * 1)
    println("result: ${res}")
    } catch (_: TimeoutException) {
    println("oops")
    }
    return 0
    }

    输出结果如下:

    1
    oops

访问线程属性

每个 Future<T> 对象都有一个对应的仓颉线程,以 Thread 对象为表示。Thread 类主要被用于访问线程的属性信息,例如线程标识等。需要注意的是,Thread 无法直接被实例化构造对象,仅能从 Future<T>thread 成员属性获取对应的 Thread 对象,或是通过 Thread 的静态成员属性 currentThread 得到当前正在执行线程对应的 Thread 对象。

Thread 类的部分方法定义如下(完整的方法描述可参考《仓颉编程语言库 API》)。

1
2
3
4
5
6
7
8
9
10
class Thread {
// Get the currently running thread
static prop currentThread: Thread

// Get the unique identifier (represented as an integer) of the thread object
prop id: Int64

// Check whether the thread has any cancellation request
prop hasPendingCancellation: Bool
}

下列示例代码在创建新线程后分别通过两种方式获取线程标识。由于主线程和新线程获取的是同一个 Thread 对象,所以他们能够打印出相同的线程标识。

1
2
3
4
5
6
7
main(): Unit {
let fut = spawn {
println("Current thread id: ${Thread.currentThread.id}")
}
println("New thread id: ${fut.thread.id}")
fut.get()
}

输出结果如下(其中线程 id 会变化,也可能为其他值):

1
2
New thread id: 1
Current thread id: 1

终止线程

可以通过 Future<T>cancel() 方法向对应的线程发送终止请求,该方法不会停止线程执行。开发者需要使用 ThreadhasPendingCancellation 属性来检查线程是否存在终止请求。

一般而言,如果线程存在终止请求,那么开发者可以实施相应的线程终止逻辑。因此,如何终止线程都交由开发者自行处理,如果开发者忽略终止请求,那么线程继续执行直到正常结束。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import std.sync.SyncCounter

main(): Unit {
let syncCounter = SyncCounter(1)
let fut = spawn {
syncCounter.waitUntilZero() // block until the syncCounter becomes zero
if (Thread.currentThread.hasPendingCancellation) { // Check cancellation request
println("cancelled")
return
}
println("hello")
}
fut.cancel() // Send cancellation request
syncCounter.dec()
fut.get() // Join thread
}

输出结果如下:

1
cancelled

同步机制

在并发编程中,如果缺少同步机制来保护多个线程共享的变量,很容易会出现数据竞争问题(data race)。

仓颉编程语言提供三种常见的同步机制来确保数据的线程安全:原子操作、互斥锁和条件变量。

原子操作 Atomic

仓颉提供整数类型、Bool 类型和引用类型的原子操作。

其中整数类型包括: Int8Int16Int32Int64UInt8UInt16UInt32UInt64

整数类型的原子操作支持基本的读写、交换以及算术运算操作:

操作 功能
load 读取
store 写入
swap 交换,返回交换前的值
compareAndSwap 比较再交换,交换成功返回 true,否则返回 false
fetchAdd 加法,返回执行加操作之前的值
fetchSub 减法,返回执行减操作之前的值
fetchAnd 与,返回执行与操作之前的值
fetchOr 或,返回执行或操作之前的值
fetchXor 异或,返回执行异或操作之前的值

需要注意的是:

  1. 交换操作和算术操作的返回值是修改前的值。
  2. compareAndSwap 是判断当前原子变量的值是否等于 old 值,如果等于,则使用 new 值替换;否则不替换。

Int8 类型为例,对应的原子操作类型声明如下:

1
2
3
4
5
6
7
8
9
10
11
class AtomicInt8 {
public func load(): Int8
public func store(val: Int8): Unit
public func swap(val: Int8): Int8
public func compareAndSwap(old: Int8, new: Int8): Bool
public func fetchAdd(val: Int8): Int8
public func fetchSub(val: Int8): Int8
public func fetchAnd(val: Int8): Int8
public func fetchOr(val: Int8): Int8
public func fetchXor(val: Int8): Int8
}

上述每一种原子类型的方法都有一个对应的方法可以接收内存排序参数,目前内存排序参数仅支持顺序一致性。

类似的,其他整数类型对应的原子操作类型有:

1
2
3
4
5
6
7
class AtomicInt16 {...}
class AtomicInt32 {...}
class AtomicInt64 {...}
class AtomicUInt8 {...}
class AtomicUInt16 {...}
class AtomicUInt32 {...}
class AtomicUInt64 {...}

下方示例演示了如何在多线程程序中,使用原子操作实现计数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import std.sync.*
import std.time.*
import std.collection.*

let count = AtomicInt64(0)

main(): Int64 {
let list = ArrayList<Future<Int64>>()

// create 1000 threads.
for (_ in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1ms.
count.fetchAdd(1)
}
list.add(fut)
}

// Wait for all threads finished.
for (f in list) {
f.get()
}

let val = count.load()
println("count = ${val}")
return 0
}

输出结果应为:

1
count = 1000

以下是使用整数类型原子操作的一些其他正确示例:

1
2
3
4
5
6
7
8
var obj: AtomicInt32 = AtomicInt32(1)
var x = obj.load() // x: 1, the type is Int32
x = obj.swap(2) // x: 1
x = obj.load() // x: 2
var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool.
y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails.
x = obj.fetchAdd(1) // x: 3
x = obj.load() // x: 4

Bool 类型和引用类型的原子操作只提供读写和交换操作:

操作 功能
load 读取
store 写入
swap 交换,返回交换前的值
compareAndSwap 比较再交换,交换成功返回 true,否则返回 false

注意:

引用类型原子操作只对引用类型有效。

原子引用类型是 AtomicReference,以下是使用 Bool 类型、引用类型原子操作的一些正确示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import std.sync.*

class A {}

main() {
var obj = AtomicBool(true)
var x1 = obj.load() // x1: true, the type is Bool
println(x1)
var t1 = A()
var obj2 = AtomicReference(t1)
var x2 = obj2.load() // x2 and t1 are the same object
var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true
println(y1)
var t2 = A()
var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false
println(y2)
y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true
println(y2)
}

编译执行上述代码,输出结果为:

1
2
3
4
true
true
false
true

可重入互斥锁 Mutex

可重入互斥锁的作用是对临界区加以保护,使得任意时刻最多只有一个线程能够执行临界区的代码。当一个线程试图获取一个已被其他线程持有的锁时,该线程会被阻塞,直到锁被释放,该线程才会被唤醒,可重入是指线程获取该锁后可再次获得该锁。

使用可重入互斥锁时,必须牢记两条规则:

  1. 在访问共享数据之前,必须尝试获取锁;
  2. 处理完共享数据后,必须释放锁,以便其他线程可以获得锁。

Mutex 提供的主要成员函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Mutex <: UniqueLock {
// Create a Mutex.
public init()

// Locks the mutex, blocks if the mutex is not available.
public func lock(): Unit

// Unlocks the mutex. If there are other threads blocking on this
// lock, then wake up one of them.
public func unlock(): Unit

// Tries to lock the mutex, returns false if the mutex is not
// available, otherwise returns true.
public func tryLock(): Bool

// Generate a Condition instance for the mutex.
public func condition(): Condition
}

下方示例演示了如何使用 Mutex 来保护对全局共享变量 count 的访问,对 count 的操作即属于临界区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = Mutex()

main(): Int64 {
let list = ArrayList<Future<Unit>>()

// create 1000 threads.
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1ms.
mtx.lock()
count++
mtx.unlock()
}
list.add(fut)
}

// Wait for all threads finished.
for (f in list) {
f.get()
}

println("count = ${count}")
return 0
}

输出结果应为:

1
count = 1000

下方示例演示了如何使用 tryLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import std.sync.*
import std.time.*

main(): Int64 {
let mtx: Mutex = Mutex()
var future: Future<Unit> = spawn {
mtx.lock()
println("get the lock, do something")
sleep(Duration.millisecond * 10)
mtx.unlock()
}
try {
future.get(Duration.millisecond * 10)
} catch (e: TimeoutException) {
if (mtx.tryLock()) {
println("tryLock success, do something")
mtx.unlock()
return 0
}
println("tryLock failed, do nothing")
return 0
}
return 0
}

一种可能的输出结果如下:

1
get the lock, do something

以下是互斥锁的一些错误示例:

错误示例 1:线程操作临界区后没有解锁,导致其他线程无法获得锁而阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import std.sync.*

var sum: Int64 = 0
let mutex = Mutex()

main() {
let foo = spawn { =>
mutex.lock()
sum = sum + 1
}
let bar = spawn { =>
mutex.lock()
sum = sum + 1
}
foo.get()
println("${sum}")
bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked.
}

错误示例 2:在本线程没有持有锁的情况下调用 unlock 将会抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
import std.sync.*

var sum: Int64 = 0
let mutex = Mutex()

main() {
let foo = spawn { =>
sum = sum + 1
mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException.
}
foo.get()
0
}

错误示例 3:tryLock() 并不保证获取到锁,可能会造成不在锁的保护下操作临界区和在没有持有锁的情况下调用 unlock 抛出异常等行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
import std.sync.*
var sum: Int64 = 0
let mutex = Mutex()

main() {
for (i in 0..100) {
spawn { =>
mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior.
sum = sum + 1
mutex.unlock()
}
}
}

另外,Mutex 在设计上是一个可重入锁,也就是说:在某个线程已经持有一个 Mutex 锁的情况下,再次尝试获取同一个 Mutex 锁,永远可以立即获得该 Mutex 锁。

注意:

虽然 Mutex 是一个可重入锁,但是调用 unlock() 的次数必须和调用 lock() 的次数相同,才能成功释放该锁。

下方示例代码演示了 Mutex 可重入的特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import std.sync.*
import std.time.*

var count: Int64 = 0
let mtx = Mutex()

func foo() {
mtx.lock()
count += 10
bar()
mtx.unlock()
}

func bar() {
mtx.lock()
count += 100
mtx.unlock()
}

main(): Int64 {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1ms.
foo()
}

foo()

fut.get()

println("count = ${count}")
return 0
}

输出结果应为:

1
count = 220

在上方示例中,无论是主线程还是新创建的线程,如果在 foo() 中已经获得了锁,那么继续调用 bar() 的话,在 bar() 函数中由于是对同一个 Mutex 进行加锁,因此也是能立即获得该锁的,不会出现死锁。

Condition

Condition 是与某个互斥锁绑定的条件变量(也就是等待队列),Condition 实例由互斥锁创建,一个互斥锁可以创建多个 Condition 实例。Condition 可以使线程阻塞并等待来自另一个线程的信号以恢复执行。这是一种利用共享变量进行线程同步的机制,主要提供如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Mutex <: UniqueLock {
// ...
// Generate a Condition instance for the mutex.
public func condition(): Condition
}

public interface Condition {
// Wait for a signal, blocking the current thread.
func wait(): Unit
func wait(timeout!: Duration): Bool

// Wait for a signal and predicate, blocking the current thread.
func waitUntil(predicate: ()->Bool): Unit
func waitUntil(predicate: ()->Bool, timeout!: Duration): Bool

// Wake up one thread of those waiting on the monitor, if any.
func notify(): Unit

// Wake up all threads waiting on the monitor, if any.
func notifyAll(): Unit
}

调用 Condition 接口的 waitnotifynotifyAll 方法前,需要确保当前线程已经持有绑定的锁。wait 方法包含如下动作:

  1. 添加当前线程到对应锁的等待队列中;
  2. 阻塞当前线程,同时完全释放该锁,并记录锁的重入次数;
  3. 等待某个其他线程使用同一个 Condition 实例的 notifynotifyAll 方法向该线程发出信号;
  4. 当前线程被唤醒后,会自动尝试重新获取锁,且持有锁的重入状态与第 2 步记录的重入次数相同;但是如果尝试获取锁失败,则当前线程会阻塞在该锁上。

wait 方法接受一个可选参数 timeout。需要注意的是,业界很多常用的常规操作系统不保证调度的实时性,因此无法保证一个线程会被阻塞“精确的 N 纳秒”——可能会观察到与系统相关的不精确情况。此外,当前语言规范明确允许实现产生虚假唤醒——在这种情况下,wait 返回值是由实现决定的——可能为 truefalse。因此鼓励开发者始终将 wait 包在一个循环中:

1
2
3
4
5
synchronized (obj) {
while (<condition is not true>) {
obj.wait()
}
}

以下是使用 Condition 的一个正确示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import std.sync.*
import std.time.*

let mtx = Mutex()
let condition = synchronized(mtx) {
mtx.condition()
}
var flag: Bool = true

main(): Int64 {
let fut = spawn {
mtx.lock()
while (flag) {
println("New thread: before wait")
condition.wait()
println("New thread: after wait")
}
mtx.unlock()
}

// Sleep for 10ms, to make sure the new thread can be executed.
sleep(10 * Duration.millisecond)

mtx.lock()
println("Main thread: set flag")
flag = false
mtx.unlock()

mtx.lock()
println("Main thread: notify")
condition.notifyAll()
mtx.unlock()

// wait for the new thread finished.
fut.get()
return 0
}

输出结果应为:

1
2
3
4
New thread: before wait
Main thread: set flag
Main thread: notify
New thread: after wait

Condition 对象执行 wait 时,必须在锁的保护下进行,否则 wait 中释放锁的操作会抛出异常。

以下是使用条件变量的一些错误示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import std.sync.*

let m1 = Mutex()
let c1 = synchronized(m1) {
m1.condition()
}
let m2 = Mutex()
var flag: Bool = true
var count: Int64 = 0

func foo1() {
spawn {
m2.lock()
while (flag) {
c1.wait() // Error:The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception.
}
count = count + 1
m2.unlock()
}
m1.lock()
flag = false
c1.notifyAll()
m1.unlock()
}

func foo2() {
spawn {
while (flag) {
c1.wait() // Error:The `wait` of a conditional variable must be called with a lock held.
}
count = count + 1
}
m1.lock()
flag = false
c1.notifyAll()
m1.unlock()
}

main() {
foo1()
foo2()
c1.wait()
return 0
}

有时在复杂的线程间同步的场景下需要对同一个锁对象生成多个 Condition 实例,以下示例实现了一个长度固定的有界 FIFO 队列。当队列为空,get() 会被阻塞;当队列已满,put() 会被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import std.sync.*

class BoundedQueue {
// Create a Mutex, two Conditions.
let m: Mutex = Mutex()
var notFull: Condition
var notEmpty: Condition

var count: Int64 // Object count in buffer.
var head: Int64 // Write index.
var tail: Int64 // Read index.

// Queue's length is 100.
let items: Array<Object> = Array<Object>(100, {i => Object()})

init() {
count = 0
head = 0
tail = 0

synchronized(m) {
notFull = m.condition()
notEmpty = m.condition()
}
}

// Insert an object, if the queue is full, block the current thread.
public func put(x: Object) {
// Acquire the mutex.
synchronized(m) {
while (count == 100) {
// If the queue is full, wait for the "queue notFull" event.
notFull.wait()
}
items[head] = x
head++
if (head == 100) {
head = 0
}
count++

// An object has been inserted and the current queue is no longer
// empty, so wake up the thread previously blocked on get()
// because the queue was empty.
notEmpty.notify()
} // Release the mutex.
}

// Pop an object, if the queue is empty, block the current thread.
public func get(): Object {
// Acquire the mutex.
synchronized(m) {
while (count == 0) {
// If the queue is empty, wait for the "queue notEmpty" event.
notEmpty.wait()
}
let x: Object = items[tail]
tail++
if (tail == 100) {
tail = 0
}
count--

// An object has been popped and the current queue is no longer
// full, so wake up the thread previously blocked on put()
// because the queue was full.
notFull.notify()

return x
} // Release the mutex.
}
}

synchronized 关键字

Lock 提供了一种便利灵活的加锁的方式,同时因为它的灵活性,也可能引起忘记解锁,或者在持有锁的情况下抛出异常不能自动释放持有的锁的问题。因此,仓颉编程语言提供一个 synchronized 关键字,搭配 Lock 一起使用,可以在其后跟随的作用域内自动进行加锁解锁操作,用来解决类似的问题。

下方示例代码演示了如何使用 synchronized 关键字来保护共享数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import std.sync.*
import std.time.*
import std.collection.*

var count: Int64 = 0
let mtx = Mutex()

main(): Int64 {
let list = ArrayList<Future<Unit>>()

// create 1000 threads.
for (i in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond) // sleep for 1ms.
// Use synchronized(mtx), instead of mtx.lock() and mtx.unlock().
synchronized(mtx) {
count++
}
}
list.add(fut)
}

// Wait for all threads finished.
for (f in list) {
f.get()
}

println("count = ${count}")
return 0
}

输出结果应为:

1
count = 1000

通过在 synchronized 后面加上一个 Lock 实例,对其后面修饰的代码块进行保护,可以使得任意时刻最多只有一个线程可以执行被保护的代码:

  1. 一个线程在进入 synchronized 修饰的代码块之前,会自动获取 Lock 实例对应的锁,如果无法获取锁,则当前线程被阻塞;
  2. 一个线程在退出 synchronized 修饰的代码块之前,会自动释放该 Lock 实例的锁。

对于控制转移表达式(如 breakcontinuereturnthrow),在导致程序的执行跳出 synchronized 代码块时,也符合上面第 2 条的说明,也就说也会自动释放 synchronized 表达式对应的锁。

下方示例演示了在 synchronized 代码块中出现 break 语句的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import std.sync.*
import std.collection.*

var count: Int64 = 0
var mtx: Mutex = Mutex()

main(): Int64 {
let list = ArrayList<Future<Unit>>()
for (i in 0..10) {
let fut = spawn {
while (true) {
synchronized(mtx) {
count = count + 1
break
println("in thread")
}
}
}
list.add(fut)
}

// Wait for all threads finished.
for (f in list) {
f.get()
}

synchronized(mtx) {
println("in main, count = ${count}")
}
return 0
}

输出结果应为:

1
in main, count = 10

实际上 in thread 这行不会被打印,因为 break 语句实际上会让程序执行跳出 while 循环(在跳出 while 循环之前,先跳出 synchronized 代码块)。

线程局部变量 ThreadLocal

使用 core 包中的 ThreadLocal 可以创建并使用线程局部变量,每一个线程都有它独立的一个存储空间来保存这些线程局部变量。因此,在每个线程可以安全地访问他们各自的线程局部变量,而不受其他线程的影响。

1
2
3
4
5
6
7
8
9
10
public class ThreadLocal<T> {
/* 构造一个携带空值的仓颉线程局部变量 */
public init()

/* 获得仓颉线程局部变量的值 */
public func get(): Option<T> // 如果值不存在,则返回 Option<T>.None。返回值 Option<T> - 仓颉线程局部变量的值

/* 通过 value 设置仓颉线程局部变量的值 */
public func set(value: Option<T>): Unit // 如果传入 Option<T>.None,该局部变量的值将被删除,在线程后续操作中将无法获取。参数 value - 需要设置的局部变量的值。
}

下方示例代码演示了如何通过 ThreadLocal类来创建并使用各自线程的局部变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
main(): Int64 {
let tl = ThreadLocal<Int64>()
let fut1 = spawn {
tl.set(123)
println("tl in spawn1 = ${tl.get().getOrThrow()}")
}
let fut2 = spawn {
tl.set(456)
println("tl in spawn2 = ${tl.get().getOrThrow()}")
}
fut1.get()
fut2.get()
0
}

可能的输出结果如下:

1
2
tl in spawn1 = 123
tl in spawn2 = 456

或者

1
2
tl in spawn2 = 456
tl in spawn1 = 123

线程睡眠指定时长 sleep

sleep 函数会阻塞当前运行的线程,该线程会主动睡眠一段时间,之后再恢复执行,其参数类型为 Duration 类型。函数原型为:

1
func sleep(dur: Duration): Unit // Sleep for at least `dur`.

注意:

如果 dur <= Duration.Zero, 那么当前线程只会让出执行资源,并不会进入睡眠。

以下是使用 sleep 的示例:

1
2
3
4
5
6
7
8
9
import std.sync.*
import std.time.*

main(): Int64 {
println("Hello")
sleep(Duration.second) // sleep for 1s.
println("World")
return 0
}

输出结果如下:

1
2
Hello
World