linkedBlokingQueue源码学习

编程技术  /  houtizong 发布于 3年前   87

ReentrantLock是一个较为常用的锁对象 可重入的互斥锁定 与synchronized 相同的一些基本行为和语义,但功能更强大

 

Condition类与lock绑定,用newCondition()方法创建,提供了线程之间通信的方式(类似信号量)。

其使用基本与object类的wait,notify,notifyAll相同。

 

1,用condition.await()替换Object,wait(),调用时该线程阻塞,释放该线程的锁。

2,用condition.signal()替换Object.notify(),用condition.signalAll()替换Object.notifyAll(),唤醒该condition await方法所阻塞的线程

 

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {       //链表节点node类结构         static class Node<E> {            volatile E item;//volatile,保证了数据的可见性             Node<E> next;            Node(E x) { item = x; }        }        //容量      private final int capacity;        //用原子变量,当前元素个数        private final AtomicInteger count = new AtomicInteger(0);        //头节点      private transient Node<E> head;        //表尾节点       private transient Node<E> last;        //获取元素或删除元素时,要加的takeLock锁        private final ReentrantLock takeLock = new ReentrantLock();        //获取元素时若队列为空,线程阻塞,直至notEmpty条件满足(被通知)       private final Condition notEmpty = takeLock.newCondition();        //插入元素时 要加putLock锁        private final ReentrantLock putLock = new ReentrantLock();        //插入时,若队列已满,线程阻塞,直至notFull条件满足(被通知)      private final Condition notFull = putLock.newCondition();        // 唤醒等待的take操作,插入数据时若插入前链表中无数据,则调用,表示链表不再为空      private void signalNotEmpty() {            final ReentrantLock takeLock = this.takeLock;            takeLock.lock();            try {                notEmpty.signal();            } finally {                takeLock.unlock();            }        }        //唤醒等待插入操作,移除数据时若链表原先已满则调用,表示链表不再满       private void signalNotFull() {            final ReentrantLock putLock = this.putLock;            putLock.lock();            try {                notFull.signal();            } finally {                putLock.unlock();            }        }        // 插入到链表尾部       private void insert(E x) {            last = last.next = new Node<E>(x);        }        //获取并移除头元素       private E extract() {            Node<E> first = head.next;            head = first;            E x = first.item;            first.item = null;            return x;        }        //锁住两把锁,在remove,clear等方法中调用         private void fullyLock() {            putLock.lock();            takeLock.lock();        }        //和fullyLock成对使用       private void fullyUnlock() {            takeLock.unlock();            putLock.unlock();        }        //默认构造,容量为 Integer.MAX_VALUE         public LinkedBlockingQueue() {            this(Integer.MAX_VALUE);        }        //指定容量的构造         public LinkedBlockingQueue(int capacity) {            if (capacity <= 0) throw new IllegalArgumentException();            this.capacity = capacity;            last = head = new Node<E>(null);        }        //指定初始化集合的构造         public LinkedBlockingQueue(Collection<? extends E> c) {            this(Integer.MAX_VALUE);            for (E e : c)                add(e);        }        //获得大小               public int size() {            return count.get();        }        //剩余容量        public int remainingCapacity() {            return capacity - count.get();        }        // 将指定元素插入到此队列的尾部,如已满,阻塞至队列中有元素被移除       public void put(E e) throws InterruptedException {            if (e == null) throw new NullPointerException();            int c = -1;            final ReentrantLock putLock = this.putLock;            final AtomicInteger count = this.count;   //加put锁,多个线程不能同时进入            putLock.lockInterruptibly();            try {                try {      //容量已满,则一直阻塞                  while (count.get() == capacity)                        notFull.await();                } catch (InterruptedException ie) {                    notFull.signal(); // propagate to a non-interrupted thread                    throw ie;                }  //插入              insert(e);                c = count.getAndIncrement();//通知链表未满                if (c + 1 < capacity)                    notFull.signal();            } finally {  //解锁,注意必须在finally里调用,反正各种异常导致没有unlock使线程死锁              putLock.unlock();            }      //通知链表非空          if (c == 0)                signalNotEmpty();        }        // 将指定元素插入到此队列的尾部,如有必要,则等待一定时间以使空间变得可用。              public boolean offer(E e, long timeout, TimeUnit unit)            throws InterruptedException {            if (e == null) throw new NullPointerException();            long nanos = unit.toNanos(timeout);            int c = -1;            final ReentrantLock putLock = this.putLock;            final AtomicInteger count = this.count;     //加锁          putLock.lockInterruptibly();            try {                for (;;) {                  //未满可插入                    if (count.get() < capacity) {                        insert(e);                        c = count.getAndIncrement();    //通知未满                        if (c + 1 < capacity)                            notFull.signal();    //跳出循环                        break;                    }     //队列已满,未能插入,等待时间是负的,直接返回                  if (nanos <= 0)                        return false;                    try {      //等待一定时间后再次尝试                      nanos = notFull.awaitNanos(nanos);                    } catch (InterruptedException ie) {                        notFull.signal(); // propagate to a non-interrupted thread                        throw ie;                    }                }            } finally {  //解锁              putLock.unlock();            }  //通知已插入数据,链表非空          if (c == 0)                signalNotEmpty();            return true;        }        //将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),        在成功时返回 true,如果此队列已满,则返回 false。               public boolean offer(E e) {            if (e == null) throw new NullPointerException();            final AtomicInteger count = this.count;            if (count.get() == capacity)                return false;            int c = -1;            final ReentrantLock putLock = this.putLock;            putLock.lock();            try {  //由于可能在lock被阻塞时其他线程进行了插入操作,需再次判断count              if (count.get() < capacity) {                    insert(e);                    c = count.getAndIncrement();    //通知未满                    if (c + 1 < capacity)                        notFull.signal();                }            } finally {                putLock.unlock();            }    //通知非空          if (c == 0)                signalNotEmpty();          // >0表示已成功插入            return c >= 0;        }        //获取并移除此队列的头部,若队列为空,则阻塞。        public E take() throws InterruptedException {            E x;            int c = -1;            final AtomicInteger count = this.count;            final ReentrantLock takeLock = this.takeLock;   //加锁           takeLock.lockInterruptibly();            try {                try {   //队列为空时阻塞                   while (count.get() == 0)                        notEmpty.await();                } catch (InterruptedException ie) {                    notEmpty.signal(); // propagate to a non-interrupted thread                    throw ie;                }  //获取数据              x = extract();                c = count.getAndDecrement();//通知非空                if (c > 1)                    notEmpty.signal();            } finally {                takeLock.unlock();            }      //通知未满          if (c == capacity)                signalNotFull();            return x;        }                //与offer方法结构基本一致,若队列为空,则阻塞一段时间,一段时间后仍为空,则返回null      public E poll(long timeout, TimeUnit unit) throws InterruptedException {            E x = null;            int c = -1;            long nanos = unit.toNanos(timeout);            final AtomicInteger count = this.count;            final ReentrantLock takeLock = this.takeLock;            takeLock.lockInterruptibly();            try {                for (;;) {                    if (count.get() > 0) {                        x = extract();                        c = count.getAndDecrement();                        if (c > 1)                            notEmpty.signal();                        break;                    }                    if (nanos <= 0)                        return null;                    try {                        nanos = notEmpty.awaitNanos(nanos);                    } catch (InterruptedException ie) {                        notEmpty.signal(); // propagate to a non-interrupted thread                        throw ie;                    }                }            } finally {                takeLock.unlock();            }            if (c == capacity)                signalNotFull();            return x;        }                ////与offer方法结构基本一致 队列为空,不阻塞,直接返回null      public E poll() {            final AtomicInteger count = this.count;            if (count.get() == 0)                return null;            E x = null;            int c = -1;            final ReentrantLock takeLock = this.takeLock;            takeLock.lock();            try {                if (count.get() > 0) {                    x = extract();                    c = count.getAndDecrement();                    if (c > 1)                        notEmpty.signal();                }            } finally {                takeLock.unlock();            }            if (c == capacity)                signalNotFull();            return x;        }        //获取但不移除此队列的头;如果此队列为空,则返回 null。        public E peek() {            if (count.get() == 0)                return null;            final ReentrantLock takeLock = this.takeLock;            takeLock.lock();            try {                Node<E> first = head.next;                if (first == null)                    return null;                else                    return first.item;            } finally {                takeLock.unlock();            }        }        /**        * 从此队列移除指定元素的单个实例(如果存在)。        */        public boolean remove(Object o) {            if (o == null) return false;            boolean removed = false;   //同时加锁,此时其他线程不能插入,不能移除          fullyLock();            try {                Node<E> trail = head;                Node<E> p = head.next;//遍历,获取到该元素                while (p != null) {                    if (o.equals(p.item)) {                        removed = true;                        break;                    }                    trail = p;                    p = p.next;                }  //删除该元素              if (removed) {                    p.item = null;                    trail.next = p.next;                    if (last == p)                        last = trail;                    if (count.getAndDecrement() == capacity)                        notFull.signalAll();                }            } finally {                fullyUnlock();            }            return removed;        }        ……    }

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客