多线程之并发工具类(七)

  1. 云栖社区>
  2. 博客>
  3. 正文

多线程之并发工具类(七)

艾贺 2018-04-09 22:03:00 浏览647
展开阅读全文

“工欲善其事必先利其器”,有了这些并发工具,多线程控制变得So easy。

img_9fe2ab5b1dbdf89cf5222a327177cd2c.png
与文无关

JDK中已经给我们内置了很多并发工具,都属于应用类型,知道具体如何使用就好,主要讲以下几个类:

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • LockSupport
  • BlockingQueue

这次的几个案例都需要实际运行,看运行效果才明白怎么回事,代码可以直接复制粘贴。

CountDownLatch

多线程控制类,计数器栅栏,当计数器满足条件的时候,再开始执行接下来的操作。

public class CountDownLatchTest {
    static final int THREAD_COUNT = 10;
    static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);

    public static void main(String[] args) throws InterruptedException {
        Runnable demo = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("检查完成");
                end.countDown();
            }
        };
        
        //线程池内有5个线程方便看效果
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < THREAD_COUNT; i++) {
            executorService.submit(demo);
        }

        end.await();
        System.out.println("一切就绪");
        executorService.shutdown();
    }
}
img_8dbcb3cfe4f5e4aa3bd087c485952333.gif
CountDownLatch 运行效果

CyclicBarrier

循环栅栏,可以看做CountDownLatch的重复利用。当满足一定的条件时候,才开始执行某线程。

// 当线程的数量满足条件时候,才开始执行。
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("一切就绪,准备出发");
            }
        });

        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getId() + ":就绪");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            executorService.submit(task);
        }


        executorService.shutdown();

    }
}
img_b0c7cd9143c258e1d7af536d0198cd8e.gif
CyclicBarrier运行结果

所有的线程都在等待,当等待的线程达到一定的数量,然后开始执行接下来的操作。

Semaphore

Semaphore,也是控制线程的一种手段,可以控制并发线程的数量,某些时候我们线程数过多,在访问有限的资源时候,可以使用Semaphore来控制线程的数量。

public class SemaphoreDemo implements Runnable {
    Semaphore mSemaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            mSemaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + " done!");
            mSemaphore.release();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        SemaphoreDemo demo = new SemaphoreDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
        executorService.shutdown();
        ;
    }
}
img_1b52c38eb83cba26a5d8a8ab50cff614.gif
Semaphore 运行案例

LockSupport

LockSupport提供了一些静态方法用于阻塞线程,和唤醒线程的功能。
处于park()挂起状态的线程是Waiting状态,park()方法阻塞的线程还支持中断,不抛出中断异常的同时设置中断标志位,然后我们可以通过中断标志位来检查。

public class LockDemo implements Runnable{
    public static Object sObject = new Object();

    @Override
    public void run() {
        synchronized (sObject){
            System.out.println("当前线程名称:" + Thread.currentThread().getName());
            LockSupport.park();

            if (Thread.currentThread().isInterrupted()){
                System.out.println( Thread.currentThread().getName() +  "被中断了");
            }
            System.out.println("执行结束");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockDemo demo = new LockDemo();
        Thread t1 = new Thread(demo,"t1");
        Thread t2 = new Thread(demo,"t2");
        t1.start();
        Thread.sleep(3000);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
img_c2f9d6e659ddf66ce9fe446d2f0b455d.gif
LockSupport的demo

BlockingQueue

Java的Queue也是面试中经常提到的知识点,这次因为我们只涉及到并发相关知识,所以只提一些并发相关的Queue,关于Queue的具体分析等后面的数据结构系列的时候再详细解说。

BlockingQueue是Java中的阻塞队列,JDK中提供了7个阻塞队列

  • ArrayBlockingQueue : 数组实现的有界队列,对元素进行FIFO(先进先出)的原则排序。
  • LinkedBlockingQueue: 链表组成的有界队列,长度默认最大值为Integer.MAX_VALUE,元素按FIFO原则排序,性能好于ArrayBlockingQueue。
  • PriorityBlockingQueue:支持优先级的无界阻塞队列。
  • DelayQueue: 支持延迟获取元素的无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列。每一个put操作必须等待take操作,否则不能继续添加元素。
  • LinkedTransferQueue:链表组成的无界传输队列
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。可以从两段插入和移除元素。

带大家看一下LinkedBlockingQueue的几个关键方法:

   //LinkedBlockingQueue 方法探索
  // 添加元素
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //如果队列满了,直接返回false
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        // 创建新的节点
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 如果队列不满的话,就让元素加入队列。
            //然后判断,当前队列元素是否满了,不满的话,通知notFull条件。
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 假如添加的是第一个元素,通知队列不为空了。
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
       
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 当队列满的时候进行等待。若不满入队
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
          
           // 同offer
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 同offer
        if (c == 0)
            signalNotEmpty();
    }

可以看出添加元素上:

  • 当队列满的时候,offer不添加元素,立刻返回。put则会阻塞操作,直到队列为不满。
  • 还有一个带参数的offer方法,和put的唯一区别就是有超时时间,在一段时间内队列还不空的话,就返回。
   //LinkedBlockingQueue 方法探索
  // 移除
    public E poll() {
        final AtomicInteger count = this.count;
        // 队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //队列有元素的话,取出元素
            //取出元素后如果队列是不为空,发出不为空的信号。
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //如果取出元素之前,队列是满的,因为取出了元素,现在发出不满的信号
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           // 队列为空的话,就等待队列不为空。
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

可以看出LinkedBlockingQueue的移除操作poll和take方法:

  • poll不阻塞,take会阻塞
  • poll(long timeout, TimeUnit unit),当队列为空的时候,等待指定的时间,如果队列扔为空,那么就返回。

这次是带领大家一起看了下LinkedBlockingQueue的关键方法,其它的队列的操作也都类似,望大家自行查看,JDK中Queue的实现并不难理解。

最后

这次主要介绍了几个并发中可能会用到的工具类,最后说了下JDK并发包中的阻塞队列,阻塞队列相对比较重要,就简单的分析了其实现。
希望能帮助到大家。

参考

  • 《Java并发实战》
  • 《Java高并发程序设计》
  • 《并发编程的艺术》

网友评论

登录后评论
0/500
评论
艾贺
+ 关注