什么是阻塞队列?
阻塞队列与队列基本一致,额外的支持阻塞添加和阻塞删除方法.
- 阻塞添加: 当队列满时,线程不断尝试向其中添加,直到有其他线程取走元素,使添加操作成功,在此期间,线程阻塞.
- 阻塞删除: 当队列为空时,线程不断尝试取出队头元素,直到有其他线程添加元素,使删除操作成功,在此期间,线程阻塞.
怎么实现阻塞呢?可以使用Java中Object类的wait(),notify(),notifyAll()等方法来实现.
- 阻塞添加: 当队列满的时候,当前线程阻塞,当生产成功之后,唤醒消费者(此时队列中至少有一个元素).
- 阻塞删除: 等队列为空的时候,当前线程阻塞,当消费成功后,唤醒生产者(此时队列中只有有一个空的位置可以用来添加元素).
更多的原理让注释体现吧!
下面的代码是一个简易版本的实现,仅仅实现了阻塞方法,对于队列常规的添加和移除方法没有实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| import mian.AbstractMain;
import java.util.LinkedList; import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueueT extends AbstractMain {
private LinkedList<Integer> items = new LinkedList<>(); private AtomicInteger count = new AtomicInteger(0);
private final int max = 100; private final int min = 0;
private final Object lock = new Object();
public void put(Integer integer) throws InterruptedException { synchronized (lock) { while (count.get() == max) { lock.wait(); } items.add(integer); count.incrementAndGet(); lock.notifyAll(); } }
public Integer pop() throws InterruptedException { synchronized (lock) { while (count.get() == min) { lock.wait(); } Integer ret = items.getFirst(); items.removeFirst(); count.decrementAndGet(); lock.notifyAll(); return ret; } }
public static void main(String[] args) throws InterruptedException { new BlockingQueueT().parseArgsAndRun(args); }
@Override public void run() throws InterruptedException { BlockingQueueT bt = new BlockingQueueT();
Thread producer = new Thread(() -> { for (int i = 0; i < 1000; i++) { logger.info("add : {}", i); try { bt.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.setName("producer"); producer.start(); Thread consumer = new Thread(() -> { while (true) { try { logger.info("get : {}", bt.pop()); } catch (InterruptedException e) { e.printStackTrace(); } } }); consumer.setName("consumer"); consumer.start(); } }
|
在main方法中,我们进行了一些测试,启动了一个生产者线程,不断的向阻塞队列中添加元素,同时启动了一个消费者线程,无限的从队列中读取.可以预期的是,在程序刚开始运行的时候,读写都会运行,而当生产者到1000之后停止,消费者会阻塞.
标准输出太多了不贴了,但是通过arthas可以看到当前的线程状态,可以看到消费者是出于wait状态的.
当然我们自己实现的这个考虑肯定不是很周全,那么就来看一下Java对阻塞队列的一些实现.
ArrayBlockingQueue
首先来看一下ArrayBlockingQueue,它是一个使用定长的数组来实现的有界的阻塞队列,和我们实现的基本类似,只是加锁使用ReentrantLock
实现,且存储结构使用数组,需要记忆当前的添加位置以及弹出位置.队列中的顺序使用FIFO策略.
此外,当多个线程阻塞等待入队或者出队时候,ArrayBlockingQueue
支持公平和非公平两种形式.
构造方法
由于是有界的阻塞队列,所以构造时都需要传入队列的大小.
ArrayblockingQueue
有三个构造方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public ArrayBlockingQueue(int capacity) { this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair);
final ReentrantLock lock = this.lock; lock.lock(); try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
|
可以发现,第一个和第三个构造方法都是对第二个的调用,而第二个构造方法中,初始化了存放元素的数组,以及用于实现阻塞机制的锁等.
插入方法
add(E)
如果队列不满则添加元素,如果队列满则抛出IllegalStateException
异常.在阻塞队列中不建议使用.
1 2 3 4 5 6 7 8 9 10
| public boolean add(E e) { return super.add(e); }
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
|
offer(E)
如果队列不满,则添加元素,队列满则返回false.不抛异常.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
|
offer(E,int,TimeUnit)
上一个offer方法的带有超时时间的版本,当队列满时,会尝试知道超时时间结束才返回false.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
|
put
当队列满时,线程等待,知道可以放入元素再执行操作.
1 2 3 4 5 6 7 8 9 10 11 12
| public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
移除方法
poll
当队列为空时,返回null.不为空则返回队头元素.
1 2 3 4 5 6 7 8 9
| public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
|
poll(long,TimeUnit)
上一个poll方法的超时版本.当队列为空时,尝试获取元素,知道超时时间到达,返回null.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
|
take
弹出元素的阻塞实现,当队列为空时,阻塞等待,知道可以获取到元素.
1 2 3 4 5 6 7 8 9 10 11
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
remove
循环删除某个元素.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }
|
其他方法
peek
返回队头的元素,但是该元素不出队.
1 2 3 4 5 6 7 8 9
| public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
|
size
返回当前队列中的元素数量.
1 2 3 4 5 6 7 8 9
| public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
|
remainingCapacity
返回当前队列中空闲的位置的数量.
1 2 3 4 5 6 7 8 9
| public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } }
|
LinkedBlockingQueue
LinkedBlockingQueue
的实现思路与ArrayBlockingQueue
基本一致,只是将锁分为了取出锁
和插入锁
.当插入和取出数据时,可以分开加锁,互不影响.且它可以是无界的.
ChangeLog
2019-04-28 完成
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客——>呼延十