0%

CountDownLatch

概述

CountDownLatch 是 Java 并发包下的一个工具类,可以看做一个计数器,它的操作是原子性的,也就是同一时刻只能有一个线程去操作这个计数器。初始化的时候我们会设置一个计数值,计数器的计数值被线程 countdown()减为 0 时,任何调用 CountDownLatch 对象上的 await()方法都会被阻塞,直至 countdown 减到 status = 0 或者超时时,才会执行 await()后面的程序。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(20L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("证据1收集完成...");
countDownLatch.countDown();
}
});
thread.start();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(20L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("证据2收集完成....");
countDownLatch.countDown();
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(20L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("证据3收集完成...");
countDownLatch.countDown();
}
});
thread2.start();

System.out.println("收据收集未完成,无法提起申诉...");
countDownLatch.await();
System.out.println("证据1、2、3收集完毕,可以提起了申诉了,这次要让让坏人被绳之以法....");


}
}


// 运行结果

收据收集未完成,无法提起申诉...
收集证据1
收集证据2
收集证据3
证据收集完毕,可以提起了申诉了,这次要让让坏人被绳之以法....

源码浅析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 构造器,初始化计数值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

/*
* 1、Sync继承AQS,重写tryAcquireShared和tryReleaseShared方法
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

//获取共享变量status status为0返回1,反之返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

// 等待
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// 带超时的等待方法
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

// status = status - 1
public void countDown() {
sync.releaseShared(1);
}