温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何理解线程安全的队列ArrayBlockingQueue源码

发布时间:2021-10-21 16:52:38 阅读:109 作者:iii 栏目:编程语言
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

本篇内容介绍了“如何理解线程安全的队列ArrayBlockingQueue源码”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一,ArrayBlockingQueue源码分析

ArrayBlockingQueue是队列的一种,队列的特点嘛,先出先出,然而这种队列是一种线程安全阻塞式的队列,为什么是阻塞式队列?我想,这正好是我写和分析这篇文章的内容所在。

由于本篇内容涉及的内容比较多,所以有些地方自己不会特地讲的很详细,但是足够自己使自己明白了,一般文章出来的时候,如果连自己读起来都费劲,或者有些不懂的地方,我想,这样的文章,一种是写作者自己遗漏了或者写的有的时候含糊其辞了。

但是,我不会,因为,我的文章基本上主要是写给自己的,如果可以帮助需要的人,自己还是比较开心的,因为,你们或许也看到了,我之前写的文章风格与别人不一样,自己觉得我把当时的想法写出来就可以了,如果不完美也没事,以后自己在改进就可以了,我想这就是我与别的创作者不同的一点,我也不是很刻意追求阅读量如何如何,当然了,如果你们关注我,或者分享我写的内容,我还是很感谢你的,哈哈,下面我们分析这个队列集合的源码了。

 

二,方法分析

 

2.0,左右可以滑动查看

 

2.1,构造函数

//默认必须给定队列的容量public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }//第二步 public ArrayBlockingQueue(int capacity, boolean fair) {     //当然了,容量不能小于等于0,因为队列是用来装填元素     //初始化容量为0,没有意义撒        if (capacity <= 0)            throw new IllegalArgumentException();     //创建一个容量为capacity大小的数组空间赋值给成员变量items        this.items = new Object[capacity];     //创建一个非公平锁的实例对象     //这里如果对ReentrantLock不了解的话,可以自己查看一下哈     //lock锁与synchronized关键字的锁的区别还是要知道一下的        lock = new ReentrantLock(fair);     //下面的newCondition操作,就是为了后面的线程间通信做准备的        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }
 

上面的分析过程中,我们了解了如何实现一个锁,以及线程间通信的内容,这里简单提及下,往后看,自己会对这部分进行详尽的说明的。

 

2.2,add()方法

public boolean add(E e) {    //调用共用的方法add        return super.add(e);    }//第二步操作public boolean add(E e) {    //复用offer()方法的实现逻辑        if (offer(e))            return true;        else    //若队列添加失败,说明队列已经满了,不可能装填数据元素了    //此时抛出队列已满的异常即可            throw new IllegalStateException("Queue full");    }//第三步操作public boolean offer(E e) {    //这个队列也是不可以装填元素为null的元素的,所以需要进行检查元素是否为空的逻辑校验        checkNotNull(e);    //获取锁实例对象        final ReentrantLock lock = this.lock;    //进行加锁操作,由于后面的大部分方法都会用到锁,所以这里可以看出这是一个线程安全的队列        lock.lock();        try {     //队列的容量,在创建的时候就已经指定了,如果队列的元素个数count和数组的空间相等了     //说明队列已经没有容量装填数据元素了,此时返回false即可            if (count == items.length)                return false;            else {      //进行入队列操作                enqueue(e);                return true;            }        } finally {            //释放锁的逻辑            lock.unlock();        }    }//第四步操作private static void checkNotNull(Object v) {        if (v == null)            throw new NullPointerException();    }//第五步操作private void enqueue(E x) {        //将实例变量items赋值给临时变量items,主要也是编程中常见的写法        final Object[] items = this.items;    //将元素x装载到队列的末尾,此时的putIndex可以数组索引下标的,我个人理解        items[putIndex] = x;    //这里为啥要加这么一句呢?我想这是因为数组空间满了,又要重新开始了,所以这里putIndex要置为0        if (++putIndex == items.length)            putIndex = 0;    //count表示队列的元素个数,是个成员变量,入队列之后,count加一是必须的        count++;    //发出一个信号通知,说明队列不空,有元素可以从队列进行获取    //这里主要是线程间通信的,等下后面会介绍线程间通信的        notEmpty.signal();    }
 

线程间通信,你知道有哪种方式吗,后面自己会单独介绍的,后面自己慢慢会介绍的,不要着急哦

 

2.3,peek()方法

public E peek() {    //加锁lock.lock        final ReentrantLock lock = this.lock;        lock.lock();        try {     //根据数组的索引下标获取指定位置的元素,此时元素并没有出队列,不同于后面要分析的poll方法            return itemAt(takeIndex);          } finally {    //解锁            lock.unlock();        }    }//第二步操作//这是一个final关键字修饰的方法,fianl关键字修饰变量,方法,类的作用都可以去回顾一下的哈 final E itemAt(int i) {     //根据索引下标获取指定位置元素,时间复杂度为o(1)        return (E) items[i];    }
 

final修饰方法,如果一个类不允许其子类覆盖某个方法,即,不允许被子类重写,则可以把这个方法声明为final方法

 

2.4,size()方法

public int size() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            //直接返回队列元素个数的实例变量count即可,是不是很简单            //于此同时,这也是一个线程安全的方法            return count;        } finally {            lock.unlock();        }    
   

2.5,contains()方法

public boolean contains(Object o) {    //因为这个队列里面不包含null元素,所以若元素o为null,则直接返回false        if (o == null) return false;        final Object[] items = this.items;        final ReentrantLock lock = this.lock;    //这是一个线程安全的方法        lock.lock();        try {            if (count > 0) {                //当队列的元素个数增加时,此时的putIndex值是增加的                final int putIndex = this.putIndex;                int i = takeIndex;                do {                    //若元素o,等于数组的其中一个元素,则直接返回false                    if (o.equals(items[i]))                        return true;                    if (++i == items.length)                        i = 0;                } while (i != putIndex);            }            //队列里面没有元素,则直接返回false            return false;        } finally {            lock.unlock();        }    }
   

2.6,take()方法

public E take() throws InterruptedException {    //线程安全的方法        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            //如果队列的元素个数为0,则此时需要等待,所以这里是一个阻塞式队列            //此时这里就相当于一条指令一直在循环判断count的值是否不等于0            while (count == 0)                notEmpty.await();            //进行出队列操作            return dequeue();        } finally {            lock.unlock();        }    }//第二步操作private E dequeue() {        //默认takeIndex是从0开始的,如果出队列了,takeIndex值就会增加一        final Object[] items = this.items;        E x = (E) items[takeIndex];    //出队列之后,元素就需要置为null了,等待gc在某个时刻进行回收不可达对象的回收        items[takeIndex] = null;    //如果takeIndex等于了数组空间的大小了,说明,队列的元素个数已经取完了,此时需要重置takeIndex值为0        if (++takeIndex == items.length)            takeIndex = 0;    //每取出一个数组元素,元素个数减一        count--;        if (itrs != null)            itrs.elementDequeued();    //发出一个信号通知,"队列不满,还可以put操作的信号"        notFull.signal();        return x;    }
   

2.7,poll()方法

public E poll() {      //线程安全        final ReentrantLock lock = this.lock;        lock.lock();        try {            //若队列的元素个数为0,则队列的元素是没有的,就返回了null            //否则就执行出队列操作,上面已经分析过了,这里就不分析了            return (count == 0) ? null : dequeue();        } finally {            lock.unlock();        }    }
   

2.8,clear()方法

public void clear() {        final Object[] items = this.items;        final ReentrantLock lock = this.lock;        lock.lock();        try {            int k = count;            //如果队列存在元素大于0,直接执行下面的操作            if (k > 0) {                //putIndex的位置,就是需要移动到的位置                final int putIndex = this.putIndex;                int i = takeIndex;                do {                    //循环将每个元素值置为null,等待gc在某个时刻触发                    items[i] = null;                    if (++i == items.length)                        i = 0;                } while (i != putIndex);                //这里为啥要赋值呢?思考一下                takeIndex = putIndex;                //元素个数置为0                count = 0;                if (itrs != null)                    itrs.queueIsEmpty();                //进行通知,此时队列里是可以装填元素了                for (; k > 0 && lock.hasWaiters(notFull); k--)                    notFull.signal();            }        } finally {            lock.unlock();        }    }
   

2.9,toString()方法

public String toString() {    //线程安全的方法        final ReentrantLock lock = this.lock;        lock.lock();        try {            int k = count;            //队列里面的元素个数为0,则返回"[]"            if (k == 0)                return "[]";            final Object[] items = this.items;            //使用StringBuilder方法进行拼接队列的每一个元素            StringBuilder sb = new StringBuilder();            sb.append('[');            for (int i = takeIndex; ; ) {                Object e = items[i];                sb.append(e == this ? "(this Collection)" : e);                if (--k == 0)                    return sb.append(']').toString();                sb.append(',').append(' ');                if (++i == items.length)                    i = 0;            }        } finally {            lock.unlock();        }    }
   

2.10,remainingCapacity()方法

public int remainingCapacity() {    //获取lock实例对象        final ReentrantLock lock = this.lock;        lock.lock();        try {     //剩余空间等于数组空间大小减去元素就是剩余空间的大小            return items.length - count;        } finally {            lock.unlock();        }    }
   

三,总结一下

 

3.1,线程间通信

基于Condition的await()和singal()方法来实现

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * @author pc */public class ConditionTest {    public Lock lock = new ReentrantLock();    public Condition condition = lock.newCondition();    public static void main(String[] args) {        ConditionTest test = new ConditionTest();        ExecutorService executorService = Executors.newFixedThreadPool(2);        executorService.execute(() -> test.conditionWait());        executorService.execute(() -> test.conditionSignal());    }    public void conditionWait() {        lock.lock();        try {            System.out.println(Thread.currentThread().getName() + "拿到锁了");            System.out.println(Thread.currentThread().getName() + "等待信号");            condition.await();            System.out.println(Thread.currentThread().getName() + "拿到信号");        } catch (Exception e) {        } finally {            lock.unlock();        }    }    public void conditionSignal() {        lock.lock();        try {            Thread.sleep(3000);            System.out.println(Thread.currentThread().getName() + "拿到锁了");            condition.signal();            System.out.println(Thread.currentThread().getName() + "发出信号");        } catch (Exception e) {        } finally {            lock.unlock();        }    }}

“如何理解线程安全的队列ArrayBlockingQueue源码”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/u/3934278/blog/4795527

AI

开发者交流群×