并发编程之-线程协作工具类

在并发编程的时候,Doug Lea 大师为我们准备了很多的工具,都在 JDK 1.5 版本后的java.util.concurrent 包下,今天楼主就和大家分享一些常用的线程协作的工具。
  1. Semaphore 信号量
  2. CountDownLatch 倒计时器
  3. CyclicBarrier 循环栅栏
  4. Exchanger 交换器

1. Semaphore 信号量

我们在上一篇文章中说到了3把锁,无论是 synchronized 还是 重入锁还是读写锁,一次只能允许一个线程进行访问。当然这是为了保证线程的安全。但是,如果我们有的时候想一次让多个线程访问同一个代码呢?并且指定线程数量。

在 JDK 1.5 中,doug lea 大师已经为我们写好了这个工具类,什么呢?就是 Semephore,信号量。信号量为多线程协作提供了更为强大的控制方法。从某种程度上说:信号量是对锁的扩展。信号量可以指定多个线程,同时访问某一个资源。

该类有2个构造函数:

1
2
3
4
5

public Semaphore(int permits)

public Semaphore(int permits, boolean fair)

permits 表示的是信号量的数量,简单点说就是指定了同时又多少个线程可以同时访问某一个资源。而 fair 参数表示的是否是公平的。那么信号量还有哪些方法呢?

下面是阻塞方法,也就是会无限等待的方法:

  1. public void acquire() throws InterruptedException { } //获取一个许可
  2. public void acquire(int permits) throws InterruptedException { } //获取permits个许可
  3. public void release() { } //释放一个许可
  4. public void release(int permits) { } //释放permits个许可

下面是非阻塞方法:

  1. public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  2. public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
  3. public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

如何使用呢?我们还是来个例子吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SemaphoreTest implements Runnable {

// 需要指定信号量的准入数,相当于指定了同时有多少个线程可以同时访问某一个资源
final Semaphore semaphore = new Semaphore(5);

@Override
public void run() {
try {
semaphore.acquire();
// 模拟耗时操作
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ":done!");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemaphoreTest test = new SemaphoreTest();
for (int i = 0; i < 20; i++) {
exec.execute(test);
}


}
}

在上面的代码中,我们指定了可以有5个信号量的实例,在线程池中被20个线程执行,打印的结果都是5个一组,5个一组,说明,的确是每5个线程同时访问该段代码。

其实信号量就是一种限制策略,在 web 服务器中,信号量就是一种限流策略,限制多少线程执行,和这个模式差不多。

2. CountDownLatch 倒计时器

从名字上来看,可以翻译成倒计时门闩,但我们其实不必管门闩,他其实就是个倒计时器。门闩的含义是什么呢?把们锁起来,不让里面的线程跑出来,因此,这个工具常用来控制线程等待,有点像我们的 join 方法,可以让某一个线程等待知道倒计时结束,再开始执行。

CountDownLatch 的构造函数:

public CountDownLatch(int count) 其中 int 类型的参数表示当前这个计时器的计数个数。

我们还是直接来个例子吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 相当于join功能,让调用 await 方法的线程等待 countdownlatch 的线程执行完毕
*/
public class CountDownLatchTest implements Runnable {

/**
* 表示需要10个线程完成任务,等待在倒计时上的线程才能继续运行。
*/
static final CountDownLatch end = new CountDownLatch(10);
static final CountDownLatchTest test = new CountDownLatchTest();

@Override
public void run() {
try {
// 模拟检查任务
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
exec.submit(test);
}

end.await();

System.out.println("Fire");

exec.shutdown();
}
}

我们模拟了火箭发射的场景,火箭发射前,都需要做一些检查任务,等到所有的检查任务完成了才能反射。那我们这里怎么实现的呢?开启20个线程执行 检查任务,注意,任务中,调用了 CountdownLatch 的 countDown 方法,就是倒计时方法,每个线程执行到这里,都会让该倒计时减一,直到为0.

而再完美的main 线程中,有一行则是 await 方法,该方法让 main 线程等待 countdown 的线程都执行完毕。当20个线程全都成功调用了 run 方法,并且调用了 countdown 的 countdown 方法,countdown 此时为0,main 线程就可以执行 “发射” 了。所以该方法的使用就是让 调用 await 方法的线程等待 调用 countdown 方法的线程,和 join 很相似,join 是调用方等待被调用方。

3. CyclicBarrier 循环栅栏

循环栅栏也是控制多线程并发的工具。和 CountDownLatch 非常类似,但是比 CountdDownLatch 复杂,强大。

看名字也很奇怪,CyclicBarrier,循环栅栏。栅栏是用来拦住别人不要进来的,在我们这里,其实就是拦住线程不要进来,而且可以循环使用。

加入有一个场景:司令下达命令,要求10个士兵一起去完成一项任务,这是,就会要求10个士兵先集合报道,接着,再一起去执行任务,当10个士兵的任务都完成了,司令对外宣布,任务完成。

我们是不是想到了使用 coundownLatch 来完成,注意,我们这里需要两次计数,而 countdown 是无法实现的,这就是循环栅栏比倒计时器强大的地方—–可以循环。

如何使用呢?

CyclicBarrier 循环栅栏提供了2 个构造方法:

public CyclicBarrier(int parties) int 类型表示计数的数量

public CyclicBarrier(int parties, Runnable barrierAction) Runnable 表示每次计数结束需要执行的任务(执行一次)

我们就各个司令士兵的例子写一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class CyclicBarrierDemo {

static class Soldier implements Runnable {

// 军人
String soldier;
// 循环栅栏
final CyclicBarrier cyclic;

public Soldier(CyclicBarrier cyclic, String soldier) {
this.cyclic = cyclic;
this.soldier = soldier;
}

@Override
public void run() {
// 等待所有士兵到齐
try {
System.out.println("准备");
cyclic.await();// 到了 10 才开始走,否则线程等待

doWork();
// 等待所有士兵完成工作
cyclic.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}

}

private void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt() % 10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ": 任务完成");
}
}

/**
* 当计数器一次计数完成后,系统会派一个线程执行的这个线程的run方法。
*/
static class BarrierRun implements Runnable {

boolean flag;
int N;

public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}

@Override
public void run() {
if (flag) {
System.out.println("司令:【士兵 " + N + "个, 任务完成!】");
} else {
System.out.println("司令:【士兵 " + N + "个, 集合完毕!】");
flag = true;
}
}
}


public static void main(String[] args) {
final int n = 10;

Thread[] allSoldier = new Thread[n];
boolean flag = false;
// parties 表示计数总数,也就是参与的线程总数, barrierAction 就是当计数器一次计数完成后,系统会执行的动作
CyclicBarrier cyclic = new CyclicBarrier(n, new BarrierRun(false, n));

// 设置屏障点,主要是为了执行这个方法
System.out.println("集合队伍");
for (int i = 0; i < n; i++) {
System.out.println("士兵" + i + "报道");
allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
allSoldier[i].start();
// if (i== 5){ // 会导致所有的线程全部停止 BrokenBarrierException * 9 + InterruptedException * 1
// allSoldier[i].interrupt();
//
// }
}
}

}

代码不少,我们场景10个士兵线程,每个士兵线程都含有同一个循环栅栏,再士兵调用run方法的时候,需要调用循环栅栏的 await 方法,此时,线程就开始等待,直到栅栏的数字变成了0,因为我们在创建的时候设置的是10,因此,需要10个线程触发此方法,当10个线程全部都触发了该方法,也就是计数器归零了,注意,此时循环栅栏会随机调用一个线程执行栅栏处发生的行动。就是我们的 BarrierRun 任务。在执行完该任务后,所有的士兵执行 doWork 方法,下面又开始 执行栅栏的 await 方法,所有的士兵又开始等待,等待所有的士兵都执行完毕,可以看到,该栅栏被循环使用了,而 countdown 是做不到的。等到所有的士兵都是调用了 await 方法,循环栅栏再次随机抽取一个线程调用 BarrierRun 的 run 方法。最后完成了所有的任务。

可以说,CyclicBarrier 和 CountDownLatch 还有 Samephore 类都是协作多个线程同时工作的工具。什么时候使用什么工具,各位可以自己思考。

只需要记住:countDown 和 CyclicBarrier 很相似,但不能循环,而 Samephore 可以控制每次又多少个线程进入某个代码块。相当于多线程的锁。具体使用场景自己看。

4. Exchanger 交换器

Exchanger 是要给交换器,也是用于线程间协作的工具类。什么用处呢?假如现在有一个需求,需要你将两个线程的数据进行狡猾,你该怎么做?

我猜测大家肯定使用类似 wait notify 之类的方法进行线程之间的通信,或者使用消息机制。但 Doug Lea 为我们提供另外一种选择:交换器,直接交换两个线程的数据。6不6?

两个线程可以通过 exchange 方法交换数据,如果第一个线程先执行 exchange 方法,他会一直等待第二个线程也执行 exchange 方法, 当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

写个例子大家看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ExchangerDemo {

static final Exchanger<String> exgr = new Exchanger<>();

static ExecutorService threadPool = Executors.newFixedThreadPool(2);

public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String a = "银行流水A";
String b = exgr.exchange(a);
System.err.println("A 和 B 数据是否一致: " + a.equals(b) + ", A 录入的是:" + a + ", B 录入的是:" + b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String b = "银行流水B";
String a = exgr.exchange(b);
System.out.println("A 和 B 数据是否一致: " + a.equals(b) + ", A 录入的是:" + a + ", B 录入的是:" + b);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

threadPool.shutdown();
}

执行结果

执行结果

可以看到,两个线程都得到了对方的数据,可以说非常的牛逼。如果两个线程有一个没有执行 exchange 方法,另一个则会一直等待,如果担心 exchanger 时间过长,可以设置过长时间 exchange(V x, long timeout, TimeUnit unit)。

总结

好了,我们今天介绍了 java.util.concurrent 包下的4个多线程协作工具类,让我们在今后并发编程中可以有更顺手的工具,有些业务场景完全可以使用这些现成的工具。比如 Samephore,CountDownLatch, CyclicBarrier,Exchanger,每个工具都有自己的应用场景。

好了,今天的并发工具使用就介绍到这里。

good luck !!!!


并发编程之-线程协作工具类
http://thinkinjava.cn/2018/01/03/2018/2018-01-03-并发编程之-线程协作工具类/
作者
莫那·鲁道
发布于
2018年1月3日
许可协议