并发编程之-SynchronousQueue-核心源码分析
它是一个非常特殊的阻塞队列,他的模式是:在 offer
的时候,如果没有另一个线程在 take 或者 poll
的话,就会失败,反之,如果在 take
或者 poll
的时候,没有线程在 offer
,则也会失败,而这种特性,则非常适合用来做高响应并且线程不固定的线程池的 Queue
。所以,在很多高性能服务器中,如果并发很高,这时候,普通的 LinkedQueue
就会成为瓶颈,性能就会出现毛刺,当换上 SynchronousQueue
后,性能就会好很多。
今天就看看这个特殊的 Queue 是怎么实现的。友情提示:代码有些小复杂。。。请做好心理准备。
源码实现
SynchronousQueue 内部分为公平(队列)和非公平(栈),队列的性能相对而言会好点。构造方法中,就看出来了。默认是非公平的,通常非公平(栈 FIFO)的性能会高那么一点点。
构造方法:
1 | public SynchronousQueue(boolean fair) { |
offer 方法
该方法我们通常建议使用带有超时机制的 offer
方法。
1 | public boolean offer(E e, long timeout, TimeUnit unit) |
从上面的代码中,可以看到核心方法就是 transfer
方法。如果该方法返回 true
,表示,插入成功,如果失败,就返回 false
。
poll 方法
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
同样的该方法也是调用了 transfer
方法。结果返回得到的值或者 null
。区别在于,offer
方法的 e
参数是实体的。而 poll
方法 e
参数是 null
,我们猜测,方法内部肯定根据这个做了判断。所以,重点在于transfer
方法的实现。
而 transferer 有 2 种,队列和栈,我们就研究一种,知晓其原理,另一种有时间在看。
TransferQueue 源码实现
构造方法:
1 | TransferQueue() { |
构造一个 Node 节点,注释说这是一个加的 node。并赋值给 head 和 tail 节点。形成一个初始化的链表。
看看这个 node:
1 | /** Node class for TransferQueue. */ |
node 持有队列中下一个 node,node 对应的值 value,持有该 node 的线程,拥有 park 或者 unpark,这里用的是 JUC 的工具类 LockSupport,还有一个布尔类型,isData,这个非常重要,需要好好理解,到后面我们会好好讲解。
我们更关注的是这个类的 transfer 方法,该方法是 SynchronousQueue 的核心。
该方法接口定义如下:
1 | /** |
注释说道 e 参数的作用:
如果 e 不是 null(说明是生产者调用) ,将 item 交给消费者,并返回 e;反之,如果是 null(说明是消费者调用),将生产者提供的 item 返回给消费者。
看看 TransferQueue 类的 transfer 方法实现,楼主写了很多的注释尝试解读:
1 | QNode s = null; // constructed/reused as needed |
说实话,代码还是比较复杂的。JDK 中注释是这么说的:
基本算法是死循环采取 2 种方式中的其中一种。
1 如果队列是空的,或者持有相同的模式节点(isData
相同),就尝试添加节点到队列中,并让当前线程等待。
2 如果队列中有线程在等待,那么就使用一种互补
的方式,使用 CAS 和等待者交换数据。并返回。
什么意思呢?
首先明确一点,队列中,数据有 2 种情况(但同时只存在一种),要么 QNode
中有实际数据(offer
的时候,是有数据的,但没有“人”来取),要么没有实际数据(poll
的时候,队列中没有数据,线程只好等待)。队列在哪一种状态取决于他为空后,第一个插入的是什么类型的数据
。
楼主画了点图来表示:
- 队列初始化的时候,只有一个空的
Node
。
- 此时,一个线程尝试
offer
或者poll
数据,都会插入一个Node
插入到节点中。
- 假设刚刚发生的是 offer 操作,这个时候,另一个线程也来 offer,这时就会有 2 个节点。
- 这个时候,队列中有 2 个有真实数据(offer 操作)的节点了,注意,这个时候,那 2 个线程都是
wait
的,因为没有人接受他们的数据。此时,又来一个线程,做 poll 操作。
从上图可以看出, poll
线程从 head
开始取数据,因为它的 isData
和 tail
节点的 isData 不同,那么就会从 head 开始找节点,并尝试将自己的 null 值和节点中的真实数据进行交换。并唤醒等待中的线程。
这 4 幅图就是 SynchronousQueue
的精华。
既然叫做同步队列,一定是 A 线程生产数据的时候,有 B 线程在消费,否则 A 线程就需要等待,反之,如果 A 线程准备消费数据,但队列中没有数据,线程也会等待,直到有 B 线程存放数据。
而 JDK 的实现原理则是:使用一个队列,队列中的用一个 isData
来区分生产还是消费,所有新操作都根据 tail 节点的模式来决定到底是追加到 tail
节点还是和 tail
节点(从 head
开始)交换数据。
而所谓的交换是从head
开始,取出节点的实际数据,然后使用 CAS
和匹配到的节点进行交换。从而完成两个线程直接交换数据的操作。
为什么他在某些情况下,比LinkedBlockingQueue
性能高呢?其中有个原因就是没有使用锁,减少了线程上下文切换。第二则是线程之间交换数据的方式更加的高效。
好,重点部分讲完了,再看看其中线程是如何等待的。逻辑在 awaitFulfill
方法中:
1 | // 自旋或者等待,直到填充完毕 |
该方法逻辑如下:
- 默认自旋 32 次,如果没有超时机制,则 512 次。
- 如果时间到了,或者线程被中断,则取消这次的操作,将
item
设置成自己。供后面判断。 - 如果自旋结束,且剩余时间还超过 1 秒,则阻塞等待至剩余时间。
- 当线程被其他的线程唤醒,说明数据被交换了。则
return
,返回的是交换后的数据。
总结
好了,关于 SynchronousQueue
的核心源码分析就到这里了,楼主没有分析这个类的所有源码,只研究了核心部分代码,这足够我们理解这个 Queue
的内部实现了。
总结下来就是:
JDK 使用了队列或者栈来实现公平或非公平模型。其中,isData
属性极为重要,标识这这个线程的这次操作,决定了他到底应该是追加到队列中,还是从队列中交换数据。
每个线程在没有遇到自己的另一半时,要么快速失败,要么进行阻塞,阻塞等待自己的另一半来,至于对方是给数据还是取数据,取决于她自己,如果她是消费者,那么他就是生产者。
good luck!!!!