public class Future<T> { // Blocking the current thread, waiting for the result of the thread corresponding to the current Future object. // If an exception occurs in the corresponding thread, the method will throw the exception. public func get(): T
// Blocking the current thread, waiting for the result of the thread corresponding to the current Future object. // If the corresponding thread has not completed execution within Duration, the method will throws TimeoutException. // If `timeout` <= Duration.Zero, its behavior is the same as `get()`. public func get(timeout: Duration): T
// Non-blocking method that immediately returns Option<T>.None if thread has not finished execution. // Returns the computed result otherwise. // If an exception occurs in the corresponding thread, the method will throw the exception. public func tryGet(): Option<T> }
下方示例代码演示了如何使用 Future<T> 在 main 中等待新创建的线程执行完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
import std.sync.* import std.time.*
main(): Int64 { let fut: Future<Unit> = spawn { => println("New thread before sleeping") sleep(100 * Duration.millisecond) // sleep for 100ms. println("New thread after sleeping") }
println("Main thread")
fut.get() // wait for the thread to finish. return 0 }
main(): Int64 { let fut: Future<Int64> = spawn { sleep(Duration.second) // sleep for 1s. return 1 }
try { // wait for the thread to finish, and get the result. let res: Int64 = fut.get() println("result = ${res}") } catch (_) { println("oops") } return 0 }
main(): Int64 { let fut = spawn { sleep(Duration.second) // sleep for 1s. return 1 }
// wait for the thread to finish, but only for 1 毫秒 try { let res = fut.get(Duration.millisecond * 1) println("result: ${res}") } catch (_: TimeoutException) { println("oops") } return 0 }
compareAndSwap 是判断当前原子变量的值是否等于 old 值,如果等于,则使用 new 值替换;否则不替换。
以 Int8 类型为例,对应的原子操作类型声明如下:
1 2 3 4 5 6 7 8 9 10 11
class AtomicInt8 { public func load(): Int8 public func store(val: Int8): Unit public func swap(val: Int8): Int8 public func compareAndSwap(old: Int8, new: Int8): Bool public func fetchAdd(val: Int8): Int8 public func fetchSub(val: Int8): Int8 public func fetchAnd(val: Int8): Int8 public func fetchOr(val: Int8): Int8 public func fetchXor(val: Int8): Int8 }
上述每一种原子类型的方法都有一个对应的方法可以接收内存排序参数,目前内存排序参数仅支持顺序一致性。
类似的,其他整数类型对应的原子操作类型有:
1 2 3 4 5 6 7
class AtomicInt16 {...} class AtomicInt32 {...} class AtomicInt64 {...} class AtomicUInt8 {...} class AtomicUInt16 {...} class AtomicUInt32 {...} class AtomicUInt64 {...}
main(): Int64 { let list = ArrayList<Future<Int64>>()
// create 1000 threads. for (_ in 0..1000) { let fut = spawn { sleep(Duration.millisecond) // sleep for 1ms. count.fetchAdd(1) } list.add(fut) }
// Wait for all threads finished. for (f in list) { f.get() }
let val = count.load() println("count = ${val}") return 0 }
输出结果应为:
1
count = 1000
以下是使用整数类型原子操作的一些其他正确示例:
1 2 3 4 5 6 7 8
var obj: AtomicInt32 = AtomicInt32(1) var x = obj.load() // x: 1, the type is Int32 x = obj.swap(2) // x: 1 x = obj.load() // x: 2 var y = obj.compareAndSwap(2, 3) // y: true, the type is Bool. y = obj.compareAndSwap(2, 3) // y: false, the value in obj is no longer 2 but 3. Therefore, the CAS operation fails. x = obj.fetchAdd(1) // x: 3 x = obj.load() // x: 4
main() { var obj = AtomicBool(true) var x1 = obj.load() // x1: true, the type is Bool println(x1) var t1 = A() var obj2 = AtomicReference(t1) var x2 = obj2.load() // x2 and t1 are the same object var y1 = obj2.compareAndSwap(x2, t1) // x2 and t1 are the same object, y1: true println(y1) var t2 = A() var y2 = obj2.compareAndSwap(t2, A()) // x and t1 are not the same object, CAS fails, y2: false println(y2) y2 = obj2.compareAndSwap(t1, A()) // CAS successes, y2: true println(y2) }
main(): Int64 { let list = ArrayList<Future<Unit>>()
// create 1000 threads. for (i in 0..1000) { let fut = spawn { sleep(Duration.millisecond) // sleep for 1ms. mtx.lock() count++ mtx.unlock() } list.add(fut) }
// Wait for all threads finished. for (f in list) { f.get() }
main(): Int64 { let mtx: Mutex = Mutex() var future: Future<Unit> = spawn { mtx.lock() println("get the lock, do something") sleep(Duration.millisecond * 10) mtx.unlock() } try { future.get(Duration.millisecond * 10) } catch (e: TimeoutException) { if (mtx.tryLock()) { println("tryLock success, do something") mtx.unlock() return 0 } println("tryLock failed, do nothing") return 0 } return 0 }
一种可能的输出结果如下:
1
get the lock, do something
以下是互斥锁的一些错误示例:
错误示例 1:线程操作临界区后没有解锁,导致其他线程无法获得锁而阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import std.sync.*
var sum: Int64 = 0 let mutex = Mutex()
main() { let foo = spawn { => mutex.lock() sum = sum + 1 } let bar = spawn { => mutex.lock() sum = sum + 1 } foo.get() println("${sum}") bar.get() // Because the thread is not unlocked, other threads waiting to obtain the current mutex will be blocked. }
错误示例 2:在本线程没有持有锁的情况下调用 unlock 将会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13
import std.sync.*
var sum: Int64 = 0 let mutex = Mutex()
main() { let foo = spawn { => sum = sum + 1 mutex.unlock() // Error, Unlock without obtaining the lock and throw an exception: IllegalSynchronizationStateException. } foo.get() 0 }
import std.sync.* var sum: Int64 = 0 let mutex = Mutex()
main() { for (i in 0..100) { spawn { => mutex.tryLock() // Error, `tryLock()` just trying to acquire a lock, there is no guarantee that the lock will be acquired, and this can lead to abnormal behavior. sum = sum + 1 mutex.unlock() } } }
public class Mutex <: UniqueLock { // ... // Generate a Condition instance for the mutex. public func condition(): Condition }
public interface Condition { // Wait for a signal, blocking the current thread. func wait(): Unit func wait(timeout!: Duration): Bool
// Wait for a signal and predicate, blocking the current thread. func waitUntil(predicate: ()->Bool): Unit func waitUntil(predicate: ()->Bool, timeout!: Duration): Bool
// Wake up one thread of those waiting on the monitor, if any. func notify(): Unit
// Wake up all threads waiting on the monitor, if any. func notifyAll(): Unit }
let mtx = Mutex() let condition = synchronized(mtx) { mtx.condition() } var flag: Bool = true
main(): Int64 { let fut = spawn { mtx.lock() while (flag) { println("New thread: before wait") condition.wait() println("New thread: after wait") } mtx.unlock() }
// Sleep for 10ms, to make sure the new thread can be executed. sleep(10 * Duration.millisecond)
mtx.lock() println("Main thread: set flag") flag = false mtx.unlock()
let m1 = Mutex() let c1 = synchronized(m1) { m1.condition() } let m2 = Mutex() var flag: Bool = true var count: Int64 = 0
func foo1() { spawn { m2.lock() while (flag) { c1.wait() // Error:The lock used together with the condition variable must be the same lock and in the locked state. Otherwise, the unlock operation in `wait` throws an exception. } count = count + 1 m2.unlock() } m1.lock() flag = false c1.notifyAll() m1.unlock() }
func foo2() { spawn { while (flag) { c1.wait() // Error:The `wait` of a conditional variable must be called with a lock held. } count = count + 1 } m1.lock() flag = false c1.notifyAll() m1.unlock() }
// Insert an object, if the queue is full, block the current thread. public func put(x: Object) { // Acquire the mutex. synchronized(m) { while (count == 100) { // If the queue is full, wait for the "queue notFull" event. notFull.wait() } items[head] = x head++ if (head == 100) { head = 0 } count++
// An object has been inserted and the current queue is no longer // empty, so wake up the thread previously blocked on get() // because the queue was empty. notEmpty.notify() } // Release the mutex. }
// Pop an object, if the queue is empty, block the current thread. public func get(): Object { // Acquire the mutex. synchronized(m) { while (count == 0) { // If the queue is empty, wait for the "queue notEmpty" event. notEmpty.wait() } let x: Object = items[tail] tail++ if (tail == 100) { tail = 0 } count--
// An object has been popped and the current queue is no longer // full, so wake up the thread previously blocked on put() // because the queue was full. notFull.notify()
main(): Int64 { let list = ArrayList<Future<Unit>>()
// create 1000 threads. for (i in 0..1000) { let fut = spawn { sleep(Duration.millisecond) // sleep for 1ms. // Use synchronized(mtx), instead of mtx.lock() and mtx.unlock(). synchronized(mtx) { count++ } } list.add(fut) }
// Wait for all threads finished. for (f in list) { f.get() }
main(): Int64 { let list = ArrayList<Future<Unit>>() for (i in 0..10) { let fut = spawn { while (true) { synchronized(mtx) { count = count + 1 break println("in thread") } } } list.add(fut) }
// Wait for all threads finished. for (f in list) { f.get() }