Java对阻塞队列的实现

什么是阻塞队列?

阻塞队列与队列基本一致,额外的支持阻塞添加和阻塞删除方法.

  • 阻塞添加: 当队列满时,线程不断尝试向其中添加,直到有其他线程取走元素,使添加操作成功,在此期间,线程阻塞.
  • 阻塞删除: 当队列为空时,线程不断尝试取出队头元素,直到有其他线程添加元素,使删除操作成功,在此期间,线程阻塞.

怎么实现阻塞呢?可以使用Java中Object类的wait(),notify(),notifyAll()等方法来实现.

  • 阻塞添加: 当队列满的时候,当前线程阻塞,当生产成功之后,唤醒消费者(此时队列中至少有一个元素).
  • 阻塞删除: 等队列为空的时候,当前线程阻塞,当消费成功后,唤醒生产者(此时队列中只有有一个空的位置可以用来添加元素).

更多的原理让注释体现吧!

下面的代码是一个简易版本的实现,仅仅实现了阻塞方法,对于队列常规的添加和移除方法没有实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import mian.AbstractMain;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by pfliu on 2019/04/28.
*/
public class BlockingQueueT extends AbstractMain {

// 存放元素的linkedlist
private LinkedList<Integer> items = new LinkedList<>();
// 计数,使用AtomicInteger,防止冲突
private AtomicInteger count = new AtomicInteger(0);

//定义队列的最大值与最小值,也就是(满/空)的定义,当然这里可以用其他方式实现,比如用一个定长的数组.
private final int max = 100;
private final int min = 0;

// 新建一个对象,用来充当锁的作用
private final Object lock = new Object();

public void put(Integer integer) throws InterruptedException {
// 加锁
synchronized (lock) {
// 如果队列是满的,则当前线程不断的等待
while (count.get() == max) {
lock.wait();
}
// 添加元素,计数增加并且唤醒消费者
items.add(integer);
count.incrementAndGet();
lock.notifyAll();
}
}

public Integer pop() throws InterruptedException {
// 加锁
synchronized (lock) {
// 如果队列是空的,则当前线程不断的等待
while (count.get() == min) {
lock.wait();
}
// 获取结果值,计数减少,唤醒消费者,返回结果
Integer ret = items.getFirst();
items.removeFirst();
count.decrementAndGet();
lock.notifyAll();
return ret;
}
}


public static void main(String[] args) throws InterruptedException {
new BlockingQueueT().parseArgsAndRun(args);
}

@Override
public void run() throws InterruptedException {
BlockingQueueT bt = new BlockingQueueT();

// 生成这线程,生成1000个元素
Thread producer = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
logger.info("add : {}", i);
try {
bt.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producer.setName("producer");
producer.start();
// 消费者线程,秩序的进行消费
Thread consumer = new Thread(() -> {
while (true) {
try {
logger.info("get : {}", bt.pop());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
consumer.setName("consumer");
consumer.start();
}
}

在main方法中,我们进行了一些测试,启动了一个生产者线程,不断的向阻塞队列中添加元素,同时启动了一个消费者线程,无限的从队列中读取.可以预期的是,在程序刚开始运行的时候,读写都会运行,而当生产者到1000之后停止,消费者会阻塞.

标准输出太多了不贴了,但是通过arthas可以看到当前的线程状态,可以看到消费者是出于wait状态的.

2019-04-28-17-58-38

当然我们自己实现的这个考虑肯定不是很周全,那么就来看一下Java对阻塞队列的一些实现.

ArrayBlockingQueue

首先来看一下ArrayBlockingQueue,它是一个使用定长的数组来实现的有界的阻塞队列,和我们实现的基本类似,只是加锁使用ReentrantLock实现,且存储结构使用数组,需要记忆当前的添加位置以及弹出位置.队列中的顺序使用FIFO策略.

此外,当多个线程阻塞等待入队或者出队时候,ArrayBlockingQueue支持公平和非公平两种形式.

构造方法

由于是有界的阻塞队列,所以构造时都需要传入队列的大小.

ArrayblockingQueue有三个构造方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

可以发现,第一个和第三个构造方法都是对第二个的调用,而第二个构造方法中,初始化了存放元素的数组,以及用于实现阻塞机制的锁等.

插入方法

add(E)

如果队列不满则添加元素,如果队列满则抛出IllegalStateException异常.在阻塞队列中不建议使用.

1
2
3
4
5
6
7
8
9
10
public boolean add(E e) {
return super.add(e);
}

public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

offer(E)

如果队列不满,则添加元素,队列满则返回false.不抛异常.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

offer(E,int,TimeUnit)

上一个offer方法的带有超时时间的版本,当队列满时,会尝试知道超时时间结束才返回false.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

put

当队列满时,线程等待,知道可以放入元素再执行操作.

1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

移除方法

poll

当队列为空时,返回null.不为空则返回队头元素.

1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

poll(long,TimeUnit)

上一个poll方法的超时版本.当队列为空时,尝试获取元素,知道超时时间到达,返回null.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

take

弹出元素的阻塞实现,当队列为空时,阻塞等待,知道可以获取到元素.

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

remove

循环删除某个元素.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

其他方法

peek

返回队头的元素,但是该元素不出队.

1
2
3
4
5
6
7
8
9
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

size

返回当前队列中的元素数量.

1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

remainingCapacity

返回当前队列中空闲的位置的数量.

1
2
3
4
5
6
7
8
9
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}

LinkedBlockingQueue

LinkedBlockingQueue的实现思路与ArrayBlockingQueue基本一致,只是将锁分为了取出锁插入锁.当插入和取出数据时,可以分开加锁,互不影响.且它可以是无界的.





ChangeLog

2019-04-28 完成

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客——>呼延十