synchronousqueue场景_【JUC】JDK1.8源码分析之SynchronousQueue(九)-程序员宅基地

技术标签: synchronousqueue场景  

一、前言

本篇是在分析Executors源码时,发现JUC集合框架中的一个重要类没有分析,SynchronousQueue,该类在线程池中的作用是非常明显的,所以很有必要单独拿出来分析一番,这对于之后理解线程池有很有好处,SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。

二、SynchronousQueue数据结构

由于SynchronousQueue的支持公平策略和非公平策略,所以底层可能两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。具体的数据结构如下

  说明:数据结构有两种类型,栈和队列;栈有一个头结点,队列有一个头结点和尾结点;栈用于实现非公平策略,队列用于实现公平策略。

三、SynchronousQueue源码分析

3.1 类的继承关系

public class SynchronousQueue extends AbstractQueue

implements BlockingQueue, java.io.Serializable {}

说明:SynchronousQueue继承了AbstractQueue抽象类,AbstractQueue定义了对队列的基本操作;同时实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。

3.2 类的内部类

SynchronousQueue的内部类框架图如下

说明:其中比较重要的类是左侧的三个类,Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现,WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解。下面着重看左侧的三个类。

① Transferer

abstract static class Transferer{/*** Performs a put or take.

*

*@parame if non-null, the item to be handed to a consumer;

* if null, requests that transfer return an item

* offered by producer.

*@paramtimed if this operation should timeout

*@paramnanos the timeout, in nanoseconds

*@returnif non-null, the item provided or received; if null,

* the operation failed due to timeout or interrupt --

* the caller can distinguish which of these occurred

* by checking Thread.interrupted.*/

//转移数据,put或者take操作

abstract E transfer(E e, boolean timed, longnanos);

}

View Code

说明:Transferer定义了transfer操作,用于take或者put数据。transfer方法由子类实现。

② TransfererStack

1. 类的继承关系

static final class TransferStack extends Transferer {}

说明:TransferStack继承Transferer抽象类,其实现了transfer方法。

2. 类的属性

static final class TransferStack extends Transferer{/** This extends Scherer-Scott dual stack algorithm, differing,

* among other ways, by using "covering" nodes rather than

* bit-marked pointers: Fulfilling operations push on marker

* nodes (with FULFILLING bit set in mode) to reserve a spot

* to match a waiting node.*/

/*Modes for SNodes, ORed together in node fields*/

/**Node represents an unfulfilled consumer*/

//表示消费数据的消费者

static final int REQUEST = 0;/**Node represents an unfulfilled producer*/

//表示生产数据的生产者

static final int DATA = 1;/**Node is fulfilling another unfulfilled DATA or REQUEST*/

//表示匹配另一个生产者或消费者

static final int FULFILLING = 2;/**The head (top) of the stack*/

//头结点

volatileSNode head;

}

View Code

说明:TransferStack有三种不同的状态,REQUEST,表示消费数据的消费者;DATA,表示生产数据的生产者;FULFILLING,表示匹配另一个生产者或消费者。任何线程对TransferStack的操作都属于上述3种状态中的一种。同时还包含一个head域,表示头结点。

3. 类的内部类

SNode类

1. 类的属性

static final classSNode {//下一个结点

volatile SNode next; //next node in stack//相匹配的结点

volatile SNode match; //the node matched to this//等待的线程

volatile Thread waiter; //to control park/unpark//元素项

Object item; //data; or null for REQUESTs//模式

intmode;//Note: item and mode fields don't need to be volatile//since they are always written before, and read after,//other volatile/atomic operations.//item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读//Unsafe mechanics//反射机制

private static finalsun.misc.Unsafe UNSAFE;//match域的内存偏移地址

private static final longmatchOffset;//next域的偏移地址

private static final longnextOffset;static{try{

UNSAFE=sun.misc.Unsafe.getUnsafe();

Class> k = SNode.class;

matchOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("match"));

nextOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

}catch(Exception e) {throw newError(e);

}

}

}

View Code

说明:SNode类表示栈中的结点,使用了反射机制和CAS来保证原子性的改变相应的域值。

2. 类的构造函数

SNode(Object item) {this.item =item;

}

View Code

说明:该构造函数仅仅设置了SNode的item域,其他域为默认值。

3. 核心函数分析

3.1. tryMatch函数

booleantryMatch(SNode s) {if (match == null &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { //本结点的match域为null并且比较并替换match域成功//获取本节点的等待线程

Thread w =waiter;if (w != null) { //存在等待的线程//waiters need at most one unpark//将本结点的等待线程重新置为null

waiter = null;//unpark等待线程

LockSupport.unpark(w);

}return true;

}//如果match不为null或者CAS设置失败,则比较match域是否等于s结点,若相等,则表示已经完成匹配,匹配成功

return match ==s;

}

View Code

说明:将s结点与本结点进行匹配,匹配成功,则unpark等待线程。具体流程如下

① 判断本结点的match域是否为null,若为null,则进入步骤②,否则,进入步骤⑤

② CAS设置本结点的match域为s结点,若成功,则进入步骤③,否则,进入步骤⑤

③ 判断本结点的waiter域是否为null,若不为null,则进入步骤④,否则,进入步骤⑤

④ 重新设置本结点的waiter域为null,并且unparkwaiter域所代表的等待线程。进入步骤⑥

⑤ 比较本结点的match域是否为本结点,若是,则进入步骤⑥,否则,进入步骤⑦

⑥ 返回true

⑦ 返回false

4. 核心函数分析

4.1 isFulfilling函数

static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

View Code

说明:表示是否包含FULFILLING标记。

4.2 transfer函数

E transfer(E e, boolean timed, longnanos) {/** Basic algorithm is to loop trying one of three actions:

*

* 1. If apparently empty or already containing nodes of same

* mode, try to push node on stack and wait for a match,

* returning it, or null if cancelled.

*

* 2. If apparently containing node of complementary mode,

* try to push a fulfilling node on to stack, match

* with corresponding waiting node, pop both from

* stack, and return matched item. The matching or

* unlinking might not actually be necessary because of

* other threads performing action 3:

*

* 3. If top of stack already holds another fulfilling node,

* help it out by doing its match and/or pop

* operations, and then continue. The code for helping

* is essentially the same as for fulfilling, except

* that it doesn't return the item.*/SNode s= null; //constructed/reused as needed//根据e确定此次转移的模式(是put or take)

int mode = (e == null) ?REQUEST : DATA;for (;;) { //无限循环//保存头结点

SNode h =head;if (h == null || h.mode == mode) { //头结点为null或者头结点的模式与此次转移的模式相同//empty or same-mode

if (timed && nanos <= 0) { //设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作//can't wait

if (h != null && h.isCancelled()) //头结点不为null并且头结点被取消

casHead(h, h.next); //重新设置头结点(弹出之前的头结点)//pop cancelled node

else //头结点为null或者头结点没有被取消//返回null

return null;

}else if (casHead(h, s = snode(s, e, h, mode))) { //生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点//Spins/blocks until node s is matched by a fulfill operation.//空旋或者阻塞直到s结点被FulFill操作所匹配

SNode m =awaitFulfill(s, timed, nanos);if (m == s) { //匹配的结点为s结点(s结点被取消)//wait was cancelled//清理s结点

clean(s);//返回

return null;

}if ((h = head) != null && h.next == s) //h重新赋值为head头结点,并且不为null;头结点的next域为s结点,表示有结点插入到s结点之前,完成了匹配//比较并替换head域(移除插入在s之前的结点和s结点)

casHead(h, s.next); //help s's fulfiller//根据此次转移的类型返回元素

return (E) ((mode == REQUEST) ?m.item : s.item);

}

}else if (!isFulfilling(h.mode)) { //没有FULFILLING标记,尝试匹配//try to fulfill

if (h.isCancelled()) //被取消//already cancelled//比较并替换head域(弹出头结点)

casHead(h, h.next); //pop and retry

else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { //生成一个SNode结点;将原来的head头结点设置为该结点的next结点;将head头结点设置为该结点

for (;;) { //无限循环//loop until matched or waiters disappear//保存s的next结点

SNode m = s.next; //m is s's match

if (m == null) { //next域为null//all waiters are gone//比较并替换head域

casHead(s, null); //pop fulfill node//赋值s为null

s = null; //use new node next time

break; //restart main loop

}//m结点的next域

SNode mn =m.next;if (m.tryMatch(s)) { //尝试匹配,并且成功//比较并替换head域(弹出s结点和m结点)

casHead(s, mn); //pop both s and m//根据此次转移的类型返回元素

return (E) ((mode == REQUEST) ?m.item : s.item);

}else //匹配不成功//lost match//比较并替换next域(弹出m结点)

s.casNext(m, mn); //help unlink

}

}

}else { //头结点正在匹配//help a fulfiller//保存头结点的next域

SNode m = h.next; //m与h可以匹配//m is h's match

if (m == null) //next域为null//waiter is gone//比较并替换head域(m被其他结点匹配了,需要弹出h)

casHead(h, null); //pop fulfilling node

else { //next域不为null//获取m结点的next域

SNode mn =m.next;if (m.tryMatch(h)) //m与h匹配成功//help match//比较并替换head域(弹出h和m结点)

casHead(h, mn); //pop both h and m

else //匹配不成功//lost match//比较并替换next域(移除m结点)

h.casNext(m, mn); //help unlink

}

}

}

}

View Code

说明:此函数用于生产或者消费一个元素,并且transfer函数调用了awaitFulfill函数,之后会通过一个例子给出流程。

4.3 awaitFulfill函数

SNode awaitFulfill(SNode s, boolean timed, longnanos) {/** When a node/thread is about to block, it sets its waiter

* field and then rechecks state at least one more time

* before actually parking, thus covering race vs

* fulfiller noticing that waiter is non-null so should be

* woken.

*

* When invoked by nodes that appear at the point of call

* to be at the head of the stack, calls to park are

* preceded by spins to avoid blocking when producers and

* consumers are arriving very close in time. This can

* happen enough to bother only on multiprocessors.

*

* The order of checks for returning out of main loop

* reflects fact that interrupts have precedence over

* normal returns, which have precedence over

* timeouts. (So, on timeout, one last check for match is

* done before giving up.) Except that calls from untimed

* SynchronousQueue.{poll/offer} don't check interrupts

* and don't wait at all, so are trapped in transfer

* method rather than calling awaitFulfill.*/

//根据timed标识计算截止时间

final long deadline = timed ? System.nanoTime() + nanos : 0L;//获取当前线程

Thread w =Thread.currentThread();//根据s确定空旋等待的时间

int spins = (shouldSpin(s) ?(timed? maxTimedSpins : maxUntimedSpins) : 0);for (;;) { //无限循环,确保操作成功

if (w.isInterrupted()) //当前线程被中断//取消s结点

s.tryCancel();//获取s结点的match域

SNode m =s.match;if (m != null) //m不为null,存在匹配结点//返回m结点

returnm;if (timed) { //设置了timed//确定继续等待的时间

nanos = deadline -System.nanoTime();if (nanos <= 0L) { //继续等待的时间小于等于0,等待超时//取消s结点

s.tryCancel();//跳过后面的部分,继续

continue;

}

}if (spins > 0) //空旋等待的时间大于0//确实是否还需要继续空旋等待

spins = shouldSpin(s) ? (spins-1) : 0;else if (s.waiter == null) //等待线程为null//设置waiter线程为当前线程

s.waiter = w; //establish waiter so can park next iter

else if (!timed) //没有设置timed标识//禁用当前线程并设置了阻塞者

LockSupport.park(this);else if (nanos > spinForTimeoutThreshold) //继续等待的时间大于阈值//禁用当前线程,最多等待指定的等待时间,除非许可可用

LockSupport.parkNanos(this, nanos);

}

}

View Code

说明:此函数表示当前线程自旋或阻塞,直到结点被匹配。awaitFulfill函数调用了shouldSpin函数

4.4 shouldSpin函数

booleanshouldSpin(SNode s) {//获取头结点

SNode h =head;//s为头结点或者头结点为null或者h包含FULFILLING标记,返回true

return (h == s || h == null ||isFulfilling(h.mode));

}

View Code

说明:此函数表示是当前结点所包含的线程(当前线程)进行空旋等待,有如下情况需要进行空旋等待

① 当前结点为头结点

② 头结点为null

③ 头结点正在匹配中

4.5 clean函数

voidclean(SNode s) {//s结点的item设置为null

s.item = null; //forget item//waiter域设置为null

s.waiter = null; //forget thread

/** At worst we may need to traverse entire stack to unlink

* s. If there are multiple concurrent calls to clean, we

* might not see s if another thread has already removed

* it. But we can stop when we see any node known to

* follow s. We use s.next unless it too is cancelled, in

* which case we try the node one past. We don't check any

* further because we don't want to doubly traverse just to

* find sentinel.*/

//获取s结点的next域

SNode past =s.next;if (past != null && past.isCancelled()) //next域不为null并且next域被取消//重新设置past

past =past.next;//Absorb cancelled nodes at head

SNode p;while ((p = head) != null && p != past && p.isCancelled()) //从栈顶头结点开始到past结点(不包括),将连续的取消结点移除//比较并替换head域(弹出取消的结点)

casHead(p, p.next);//Unsplice embedded nodes

while (p != null && p != past) { //移除上一步骤没有移除的非连续的取消结点//获取p的next域

SNode n =p.next;if (n != null && n.isCancelled()) //n不为null并且n被取消//比较并替换next域

p.casNext(n, n.next);else

//设置p为n

p =n;

}

}

View Code

说明:此函数用于移除从栈顶头结点开始到该结点(不包括)之间的所有已取消结点。

③ TransferQueue

1. 类的继承关系

static final class TransferQueue extends Transferer {}

说明:TransferQueue继承Transferer抽象类,其实现了transfer方法。

2. 类的属性

static final class TransferQueue extends Transferer{/** This extends Scherer-Scott dual queue algorithm, differing,

* among other ways, by using modes within nodes rather than

* marked pointers. The algorithm is a little simpler than

* that for stacks because fulfillers do not need explicit

* nodes, and matching is done by CAS'ing QNode.item field

* from non-null to null (for put) or vice versa (for take).*/

/**Head of queue*/

//队列的头结点

transient volatileQNode head;/**Tail of queue*/

//队列的尾结点

transient volatileQNode tail;/*** Reference to a cancelled node that might not yet have been

* unlinked from queue because it was the last inserted node

* when it was cancelled.*/

//指向一个取消的结点,当一个结点是最后插入队列时,当被取消时,它可能还没有离开队列

transient volatileQNode cleanMe;

}

View Code

说明:队列存在一个头结点和一个尾节点,分别指示队头和队尾,还包含了一个指示取消结点的域。

3. 类的内部类

QNode类

QNode的源码如下

static final classQNode {//下一个结点

volatile QNode next; //next node in queue//元素项

volatile Object item; //CAS'ed to or from null//等待线程

volatile Thread waiter; //to control park/unpark//是否为数据

final booleanisData;//构造函数

QNode(Object item, booleanisData) {//初始化item域

this.item =item;//初始化isData域

this.isData =isData;

}//比较并替换next域

booleancasNext(QNode cmp, QNode val) {return next == cmp &&UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}//比较并替换item域

booleancasItem(Object cmp, Object val) {return item == cmp &&UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);

}/*** Tries to cancel by CAS'ing ref to this as item.*/

//取消本结点,将item域设置为自身

voidtryCancel(Object cmp) {

UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);

}//是否被取消

booleanisCancelled() {//item域是否等于自身

return item == this;

}/*** Returns true if this node is known to be off the queue

* because its next pointer has been forgotten due to

* an advanceHead operation.*/

//是否不在队列中

booleanisOffList() {//next与是否等于自身

return next == this;

}//Unsafe mechanics//反射机制

private static finalsun.misc.Unsafe UNSAFE;//item域的偏移地址

private static final longitemOffset;//next域的偏移地址

private static final longnextOffset;static{try{

UNSAFE=sun.misc.Unsafe.getUnsafe();

Class> k = QNode.class;

itemOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("item"));

nextOffset=UNSAFE.objectFieldOffset

(k.getDeclaredField("next"));

}catch(Exception e) {throw newError(e);

}

}

}

View Code

说明:QNode表示队列中的结点,并且通过反射和CAS原子性的修改对应的域值。

4. 类的构造函数

TransferQueue() {//初始化一个哨兵结点

QNode h = new QNode(null, false); //initialize to dummy node.//设置头结点

head =h;//设置尾结点

tail =h;

}

View Code

说明:该构造函数用于初始化一个队列,并且初始化了一个哨兵结点,头结点与尾节点均指向该哨兵结点。

5. 核心函数分析

5.1 transfer函数

E transfer(E e, boolean timed, longnanos) {/*Basic algorithm is to loop trying to take either of

* two actions:

*

* 1. If queue apparently empty or holding same-mode nodes,

* try to add node to queue of waiters, wait to be

* fulfilled (or cancelled) and return matching item.

*

* 2. If queue apparently contains waiting items, and this

* call is of complementary mode, try to fulfill by CAS'ing

* item field of waiting node and dequeuing it, and then

* returning matching item.

*

* In each case, along the way, check for and try to help

* advance head and tail on behalf of other stalled/slow

* threads.

*

* The loop starts off with a null check guarding against

* seeing uninitialized head or tail values. This never

* happens in current SynchronousQueue, but could if

* callers held non-volatile/final ref to the

* transferer. The check is here anyway because it places

* null checks at top of loop, which is usually faster

* than having them implicitly interspersed.*/QNode s= null; //constructed/reused as needed//确定此次转移的类型(put or take)

boolean isData = (e != null);for (;;) { //无限循环,确保操作成功//获取尾结点

QNode t =tail;//获取头结点

QNode h =head;if (t == null || h == null) //看到未初始化的头尾结点//saw uninitialized value//跳过后面的部分,继续

continue; //spin

if (h == t || t.isData == isData) { //头结点与尾结点相等或者尾结点的模式与当前结点模式相同//empty or same-mode//获取尾结点的next域

QNode tn =t.next;if (t != tail) //t不为尾结点,不一致,重试//inconsistent read

continue;if (tn != null) { //tn不为null,有其他线程添加了tn结点//lagging tail//设置新的尾结点为tn

advanceTail(t, tn);//跳过后面的部分,继续

continue;

}if (timed && nanos <= 0) //设置了timed并且等待时间小于等于0,表示不能等待,需要立即操作//can't wait//返回null

return null;if (s == null) //s为null//新生一个结点并赋值给s

s = newQNode(e, isData);if (!t.casNext(null, s)) //设置t结点的next域不成功//failed to link in//跳过后面的部分,继续

continue;//设置新的尾结点

advanceTail(t, s); //swing tail and wait//Spins/blocks until node s is fulfilled//空旋或者阻塞直到s结点被匹配

Object x =awaitFulfill(s, e, timed, nanos);if (x == s) { //x与s相等,表示已经取消//wait was cancelled//清除

clean(t, s);//返回null

return null;

}if (!s.isOffList()) { //s结点还没离开队列//not already unlinked//设置新的头结点

advanceHead(t, s); //unlink if head

if (x != null) //x不为null//and forget fields//设置s结点的item

s.item =s;//设置s结点的waiter域为null

s.waiter = null;

}return (x != null) ?(E)x : e;

}else { //模式互补//complementary-mode//获取头结点的next域(匹配的结点)

QNode m = h.next; //node to fulfill

if (t != tail || m == null || h != head) //t不为尾结点或者m为null或者h不为头结点(不一致)//跳过后面的部分,继续

continue; //inconsistent read//获取m结点的元素域

Object x =m.item;if (isData == (x != null) || //m结点被匹配//m already fulfilled

x == m || //m结点被取消//m cancelled

!m.casItem(x, e)) { //CAS操作失败//lost CAS

advanceHead(h, m); //队列头结点出队列,并重试//dequeue and retry

continue;

}//匹配成功,设置新的头结点

advanceHead(h, m); //successfully fulfilled//unpark m结点对应的等待线程

LockSupport.unpark(m.waiter);return (x != null) ?(E)x : e;

}

}

}

View Code

说明:此函数用于生产或者消费一个元素,并且transfer函数调用了awaitFulfill函数,之后会通过一个例子给出流程。

5.2 awaitFulfill函数

Object awaitFulfill(QNode s, E e, boolean timed, longnanos) {/*Same idea as TransferStack.awaitFulfill*/

//根据timed标识计算截止时间

final long deadline = timed ? System.nanoTime() + nanos : 0L;//获取当前线程

Thread w =Thread.currentThread();//计算空旋时间

int spins = ((head.next == s) ?(timed? maxTimedSpins : maxUntimedSpins) : 0);for (;;) { //无限循环,确保操作成功

if (w.isInterrupted()) //当前线程被中断//取消

s.tryCancel(e);//获取s的元素域

Object x =s.item;if (x != e) //元素不为e//返回

returnx;if (timed) { //设置了timed//计算继续等待的时间

nanos = deadline -System.nanoTime();if (nanos <= 0L) { //继续等待的时间小于等于0//取消

s.tryCancel(e);//跳过后面的部分,继续

continue;

}

}if (spins > 0) //空旋时间大于0//减少空旋时间

--spins;else if (s.waiter == null) //等待线程为null//设置等待线程

s.waiter =w;else if (!timed) //没有设置timed标识//禁用当前线程并设置了阻塞者

LockSupport.park(this);else if (nanos > spinForTimeoutThreshold) //继续等待的时间大于阈值//禁用当前线程,最多等待指定的等待时间,除非许可可用

LockSupport.parkNanos(this, nanos);

}

}

View Code

说明:此函数表示当前线程自旋或阻塞,直到结点被匹配。

5.3 clean函数

voidclean(QNode pred, QNode s) {//设置等待线程为null

s.waiter = null; //forget thread

/** At any given time, exactly one node on list cannot be

* deleted -- the last inserted node. To accommodate this,

* if we cannot delete s, we save its predecessor as

* "cleanMe", deleting the previously saved version

* first. At least one of node s or the node previously

* saved can always be deleted, so this always terminates.*/

/** 在任何时候,最后插入的结点不能删除,为了满足这个条件

* 如果不能删除s结点,我们将s结点的前驱设置为cleanMe结点

* 删除之前保存的版本,至少s结点或者之前保存的结点能够被删除

* 所以最后总是会结束*/

while (pred.next == s) { //pred的next域为s//Return early if already unlinked//获取头结点

QNode h =head;//获取头结点的next域

QNode hn = h.next; //Absorb cancelled first node as head

if (hn != null && hn.isCancelled()) { //hn不为null并且hn被取消//设置新的头结点

advanceHead(h, hn);//跳过后面的部分,继续

continue;

}//获取尾结点,保证对尾结点的读一致性

QNode t = tail; //Ensure consistent read for tail

if (t == h) //尾结点为头结点,表示队列为空//返回

return;//获取尾结点的next域

QNode tn =t.next;if (t != tail) //t不为尾结点,不一致,重试//跳过后面的部分,继续

continue;if (tn != null) { //tn不为null//设置新的尾结点

advanceTail(t, tn);//跳过后面的部分,继续

continue;

}if (s != t) { //s不为尾结点,移除s//If not tail, try to unsplice

QNode sn =s.next;if (sn == s || pred.casNext(s, sn)) //

return;

}//获取cleanMe结点

QNode dp =cleanMe;if (dp != null) { //dp不为null,断开前面被取消的结点//Try unlinking previous cancelled node//获取dp的next域

QNode d =dp.next;

QNode dn;if (d == null || //d is gone or

d == dp || //d is off list or

!d.isCancelled() || //d not cancelled or

(d != t && //d not tail and

(dn = d.next) != null && //has successor

dn != d && //that is on list

dp.casNext(d, dn))) //d unspliced

casCleanMe(dp, null);if (dp ==pred)return; //s is already saved node

} else if (casCleanMe(null, pred))return; //Postpone cleaning s

}

}

View Code

说明:此函数用于移除已经被取消的结点。

3.3 类的属性

public class SynchronousQueue extends AbstractQueue

implements BlockingQueue, java.io.Serializable {//版本序列号

private static final long serialVersionUID = -3223113410248163686L;//可用的处理器

static final int NCPUS =Runtime.getRuntime().availableProcessors();//最大空旋时间

static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;//无限时的等待的最大空旋时间

static final int maxUntimedSpins = maxTimedSpins * 16;//超时空旋等待阈值

static final long spinForTimeoutThreshold = 1000L;//用于序列化

privateReentrantLock qlock;privateWaitQueue waitingProducers;privateWaitQueue waitingConsumers;

}

View Code

说明:SynchronousQueue类的属性包含了空旋等待时间相关的属性。

3.4 类的构造函数

1. SynchronousQueue()型构造函数

publicSynchronousQueue() {//非公平策略(先进后出)

this(false);

}

View Code

说明:该构造函数用于创建一个具有非公平访问策略的 SynchronousQueue。

2. SynchronousQueue(boolean)型构造函数

public SynchronousQueue(booleanfair) {//根据指定的策略生成不同的结构

transferer = fair ? new TransferQueue() : new TransferStack();

}

View Code

说明:创建一个具有指定公平策略的 SynchronousQueue。

3.5 核心函数分析

在分析了TransferStack和TransferQueue的相关函数后,SynchronousQueue的函数的分析就非常简单。

//将指定元素添加到此队列,如有必要则等待另一个线程接收它

public void put(E e) throwsInterruptedException {//e为null则抛出异常

if (e == null) throw newNullPointerException();if (transferer.transfer(e, false, 0) == null) { //进行转移操作//中断当前线程

Thread.interrupted();throw newInterruptedException();

}

}//将指定元素插入到此队列,如有必要则等待指定的时间,以便另一个线程接收它

public boolean offer(E e, longtimeout, TimeUnit unit)throwsInterruptedException {//e为null则抛出异常

if (e == null) throw newNullPointerException();if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) //进行转移操作

return true;if (!Thread.interrupted()) //当前线程没有被中断//返回

return false;throw newInterruptedException();

}//如果另一个线程正在等待以便接收指定元素,则将指定元素插入到此队列

public booleanoffer(E e) {//e为null则抛出异常

if (e == null) throw newNullPointerException();return transferer.transfer(e, true, 0) != null; //进行转移操作

}//获取并移除此队列的头,如有必要则等待另一个线程插入它

public E take() throwsInterruptedException {//进行转移操作

E e = transferer.transfer(null, false, 0);if (e != null)returne;

Thread.interrupted();throw newInterruptedException();

}//获取并移除此队列的头,如有必要则等待指定的时间,以便另一个线程插入它

public E poll(long timeout, TimeUnit unit) throwsInterruptedException {

E e= transferer.transfer(null, true, unit.toNanos(timeout));if (e != null || !Thread.interrupted()) //元素不为null或者当前线程没有被中断

returne;throw newInterruptedException();

}//如果另一个线程当前正要使用某个元素,则获取并移除此队列的头

publicE poll() {return transferer.transfer(null, true, 0);

}//始终返回 true

public booleanisEmpty() {return true;

}//始终返回 0

public intsize() {return 0;

}//始终返回 0

public intremainingCapacity() {return 0;

}//不执行任何操作

public voidclear() {

}//始终返回false

public booleancontains(Object o) {return false;

}//始终返回false

public booleanremove(Object o) {return false;

}//除非给定 collection 为空,否则返回 false

public boolean containsAll(Collection>c) {returnc.isEmpty();

}//始终返回 false

public boolean removeAll(Collection>c) {return false;

}//始终返回 false

public boolean retainAll(Collection>c) {return false;

}//始终返回 null

publicE peek() {return null;

}//返回一个空迭代器,其中 hasNext 始终返回 false

public Iteratoriterator() {returnCollections.emptyIterator();

}//

public Spliteratorspliterator() {returnSpliterators.emptySpliterator();

}//返回一个 0 长度的数组

publicObject[] toArray() {return new Object[0];

}//将指定数组的第 0 个元素设置为 null(如果该数组有非 0 的长度)并返回它

public T[] toArray(T[] a) {if (a.length > 0)

a[0] = null;returna;

}//移除此队列中所有可用的元素,并将它们添加到给定 collection 中

public int drainTo(Collection super E>c) {if (c == null)throw newNullPointerException();if (c == this)throw newIllegalArgumentException();int n = 0;for (E e; (e = poll()) != null;) {

c.add(e);++n;

}returnn;

}//最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中

public int drainTo(Collection super E> c, intmaxElements) {if (c == null)throw newNullPointerException();if (c == this)throw newIllegalArgumentException();int n = 0;for (E e; n < maxElements && (e = poll()) != null;) {

c.add(e);++n;

}returnn;

}

View Code

说明:SynchronousQueue的函数很大程度都是依托于TransferStack或TransferQueue的transfer函数,所以,了解transfer函数就可以了解SynchronousQueue的原理。

四、示例

下面通过一个示例来详细了解SynchronousQueue的使用。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue();

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result truec2 take result10p2 offer resulttruec1 take result50

说明:该示例中,有两个生产者p1、p2和两个消费者c1、c2,按照c1、c2、p1、p2的顺序启动,并且每个线程启动后休眠100ms,则可能有如下的时序图

说明:时序图中,c1线程的take操作早于c2线程的take操作早于p1线程的offer操作早于p2线程的offer操作。

根据示例源码可知,此SynchronousQueue采用非公平策略,即底层采用栈结构。

① c1执行take操作,主要的函数调用如下

说明:其中,c1线程进入awaitFulfill后,会空旋等待,直到空旋时间消逝,会调用LockSupport.park函数,会禁用当前线程(c1),直至许可可用。

② c1执行take操作,主要的函数调用如下

说明:其中,c2线程进入awaitFulfill后,会空旋等待,直到空旋时间消逝,会调用LockSupport.park函数,会禁用当前线程(c2),直至许可可用。并且此时栈中有两个节点,c2线程所在的结点和c1线程所在的结点。

③ p1线程执行offer(10)操作,主要的函数调用如下

说明:在执行offer(10)操作后,c2线程所在的结点与头结点进行了匹配(头结点生产数据,c2线程所在的结点消费数据),c2线程被unpark,可以继续运行,而c1线程还是被park中(非公平策略)。

③ c2线程被unpark后,继续运行,主要函数调用如下(由于c2线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)

说明:c2线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回10,再从take函数中返回10。

④ p2线程执行offer(50)操作,主要的函数调用如下

说明:在执行offer(50)操作后,c1线程所在的结点与头结点进行了匹配(头结点生产数据,c1线程所在的结点消费数据),c1线程被unpark,可以继续运行。

⑤ c1线程被unpark后,继续运行,主要函数调用如下(由于c1线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)

说明:c1线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回50,再从take函数中返回50。

上述是使用非公平策略的结果(首先匹配c2线程所在的结点,之后再匹配c1线程所在结点)。

修改示例,改用公平策略。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue(true);

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result truec1 take result10p2 offer resulttruec2 take result50

说明:从运行结果可知,c1线程会比c2线程先匹配(因为采用公平策略,先入队列先匹配,所以c1先得到匹配,然后再匹配c2)。具体的流程图与非公平策略类似,在此不再累赘。

当再次修改源码,还是使用非公平策略,只是改变c1、c2、p1、p2之间的启动顺序。更改为p1->c1->p2->c2。

packagecom.hust.grid.leesf.collections;importjava.util.concurrent.SynchronousQueue;importjava.util.concurrent.TimeUnit;public classSynchronousQueueDemo {public static voidmain(String[] args) {

SynchronousQueue queue = new SynchronousQueue();

Producer p1= new Producer("p1", queue, 10);

Producer p2= new Producer("p2", queue, 50);

Consumer c1= new Consumer("c1", queue);

Consumer c2= new Consumer("c2", queue);

p1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c1.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

p2.start();try{

TimeUnit.MILLISECONDS.sleep(100);

}catch(InterruptedException e) {

e.printStackTrace();

}

c2.start();

}static class Producer extendsThread {private SynchronousQueuequeue;private intn;public Producer(String name, SynchronousQueue queue, intn) {super(name);this.queue =queue;this.n =n;

}public voidrun() {

System.out.println(getName()+ " offer result " +queue.offer(n));

}

}static class Consumer extendsThread {private SynchronousQueuequeue;public Consumer(String name, SynchronousQueuequeue) {super(name);this.queue =queue;

}public voidrun() {try{

System.out.println(getName()+ " take result " +queue.take());

}catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

View Code

运行结果(某一次)

p1 offer result falsep2 offer resulttruec1 take result50

说明:此时,只有c1线程得到了匹配,p1线程存放元素,直接返回的false,因为此时没有消费者线程等待,而p2线程与c1线程进行了匹配,p2线程存放元素成功,c1线程获取元素成功,并且此时,c2线程还是处于park状态,此时应用程序无法正常结束。所以,可知,必须要先有取操作,然后存操作,两者才能正确的匹配,若先是存操作,然后再是取操作,此时无法匹配成功,会阻塞,取操作期待下一个存操作进行匹配。

五、总结

SynchronousQueue的源码就分析到这里,SynchronousQueue适合一对一的匹配场景,没有容量,无法缓存。有了这个基础,之后会方便分析线程池框架的源码,谢谢各位园友的观看~

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39836530/article/details/111777826

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签