Java对阻塞队列的实现

Posted1 by 呼延十 on April 27, 2019 Hot:

什么是阻塞队列?

阻塞队列与队列基本一致,额外的支持阻塞添加和阻塞删除方法.

  • 阻塞添加: 当队列满时,线程不断尝试向其中添加,直到有其他线程取走元素,使添加操作成功,在此期间,线程阻塞.
  • 阻塞删除: 当队列为空时,线程不断尝试取出队头元素,直到有其他线程添加元素,使删除操作成功,在此期间,线程阻塞.

怎么实现阻塞呢?可以使用Java中Object类的wait(),notify(),notifyAll()等方法来实现.

  • 阻塞添加: 当队列满的时候,当前线程阻塞,当生产成功之后,唤醒消费者(此时队列中至少有一个元素).
  • 阻塞删除: 等队列为空的时候,当前线程阻塞,当消费成功后,唤醒生产者(此时队列中只有有一个空的位置可以用来添加元素).

更多的原理让注释体现吧!

下面的代码是一个简易版本的实现,仅仅实现了阻塞方法,对于队列常规的添加和移除方法没有实现:

import mian.AbstractMain;

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by pfliu on 2019/04/28.
 */
public class BlockingQueueT extends AbstractMain {

    // 存放元素的linkedlist
    private LinkedList<Integer> items = new LinkedList<>();
    // 计数,使用AtomicInteger,防止冲突
    private AtomicInteger count = new AtomicInteger(0);

    //定义队列的最大值与最小值,也就是(满/空)的定义,当然这里可以用其他方式实现,比如用一个定长的数组.
    private final int max = 100;
    private final int min = 0;

    // 新建一个对象,用来充当锁的作用
    private final Object lock = new Object();

    public void put(Integer integer) throws InterruptedException {
        // 加锁
        synchronized (lock) {
            // 如果队列是满的,则当前线程不断的等待
            while (count.get() == max) {
                lock.wait();
            }
            // 添加元素,计数增加并且唤醒消费者
            items.add(integer);
            count.incrementAndGet();
            lock.notifyAll();
        }
    }

    public Integer pop() throws InterruptedException {
        // 加锁
        synchronized (lock) {
            // 如果队列是空的,则当前线程不断的等待
            while (count.get() == min) {
                lock.wait();
            }
            // 获取结果值,计数减少,唤醒消费者,返回结果
            Integer ret = items.getFirst();
            items.removeFirst();
            count.decrementAndGet();
            lock.notifyAll();
            return ret;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new BlockingQueueT().parseArgsAndRun(args);
    }

    @Override
    public void run() throws InterruptedException {
        BlockingQueueT bt = new BlockingQueueT();

        // 生成这线程,生成1000个元素
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                logger.info("add : {}", i);
                try {
                    bt.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        producer.setName("producer");
        producer.start();
        // 消费者线程,秩序的进行消费
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    logger.info("get : {}", bt.pop());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        consumer.setName("consumer");
        consumer.start();
    }
}

在main方法中,我们进行了一些测试,启动了一个生产者线程,不断的向阻塞队列中添加元素,同时启动了一个消费者线程,无限的从队列中读取.可以预期的是,在程序刚开始运行的时候,读写都会运行,而当生产者到1000之后停止,消费者会阻塞.

标准输出太多了不贴了,但是通过arthas可以看到当前的线程状态,可以看到消费者是出于wait状态的.

2019-04-28-17-58-38

当然我们自己实现的这个考虑肯定不是很周全,那么就来看一下Java对阻塞队列的一些实现.

ArrayBlockingQueue

首先来看一下ArrayBlockingQueue,它是一个使用定长的数组来实现的有界的阻塞队列,和我们实现的基本类似,只是加锁使用ReentrantLock实现,且存储结构使用数组,需要记忆当前的添加位置以及弹出位置.队列中的顺序使用FIFO策略.

此外,当多个线程阻塞等待入队或者出队时候,ArrayBlockingQueue支持公平和非公平两种形式.

构造方法

由于是有界的阻塞队列,所以构造时都需要传入队列的大小.

ArrayblockingQueue有三个构造方法,如下:

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

可以发现,第一个和第三个构造方法都是对第二个的调用,而第二个构造方法中,初始化了存放元素的数组,以及用于实现阻塞机制的锁等.

插入方法

add(E)

如果队列不满则添加元素,如果队列满则抛出IllegalStateException异常.在阻塞队列中不建议使用.

    public boolean add(E e) {
        return super.add(e);
    }

        public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

offer(E)

如果队列不满,则添加元素,队列满则返回false.不抛异常.

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

offer(E,int,TimeUnit)

上一个offer方法的带有超时时间的版本,当队列满时,会尝试知道超时时间结束才返回false.

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }

put

当队列满时,线程等待,知道可以放入元素再执行操作.

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

移除方法

poll

当队列为空时,返回null.不为空则返回队头元素.

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

poll(long,TimeUnit)

上一个poll方法的超时版本.当队列为空时,尝试获取元素,知道超时时间到达,返回null.

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

take

弹出元素的阻塞实现,当队列为空时,阻塞等待,知道可以获取到元素.

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

remove

循环删除某个元素.

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

其他方法

peek

返回队头的元素,但是该元素不出队.

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }

size

返回当前队列中的元素数量.

    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

remainingCapacity

返回当前队列中空闲的位置的数量.

    public int remainingCapacity() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue

LinkedBlockingQueue的实现思路与ArrayBlockingQueue基本一致,只是将锁分为了取出锁插入锁.当插入和取出数据时,可以分开加锁,互不影响.且它可以是无界的.





ChangeLog

2019-04-28 完成

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客——>呼延十