多线程之并发工具类(七)

简介: “工欲善其事必先利其器”,有了这些并发工具,多线程控制变得So easy。与文无关JDK中已经给我们内置了很多并发工具,都属于应用类型,知道具体如何使用就好,主要讲以下几个类:CountDownLatchCyclicBarrierSemaphoreLockSupportBlockingQueue这次的几个案例都需要实际运行,看运行效果才明白怎么回事,代码可以直接复制粘贴。

“工欲善其事必先利其器”,有了这些并发工具,多线程控制变得So easy。

img_9fe2ab5b1dbdf89cf5222a327177cd2c.png
与文无关

JDK中已经给我们内置了很多并发工具,都属于应用类型,知道具体如何使用就好,主要讲以下几个类:

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • LockSupport
  • BlockingQueue

这次的几个案例都需要实际运行,看运行效果才明白怎么回事,代码可以直接复制粘贴。

CountDownLatch

多线程控制类,计数器栅栏,当计数器满足条件的时候,再开始执行接下来的操作。

public class CountDownLatchTest {
    static final int THREAD_COUNT = 10;
    static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);

    public static void main(String[] args) throws InterruptedException {
        Runnable demo = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("检查完成");
                end.countDown();
            }
        };
        
        //线程池内有5个线程方便看效果
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < THREAD_COUNT; i++) {
            executorService.submit(demo);
        }

        end.await();
        System.out.println("一切就绪");
        executorService.shutdown();
    }
}
img_8dbcb3cfe4f5e4aa3bd087c485952333.gif
CountDownLatch 运行效果

CyclicBarrier

循环栅栏,可以看做CountDownLatch的重复利用。当满足一定的条件时候,才开始执行某线程。

// 当线程的数量满足条件时候,才开始执行。
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                System.out.println("一切就绪,准备出发");
            }
        });

        Runnable task = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getId() + ":就绪");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 4; i++) {
            executorService.submit(task);
        }


        executorService.shutdown();

    }
}
img_b0c7cd9143c258e1d7af536d0198cd8e.gif
CyclicBarrier运行结果

所有的线程都在等待,当等待的线程达到一定的数量,然后开始执行接下来的操作。

Semaphore

Semaphore,也是控制线程的一种手段,可以控制并发线程的数量,某些时候我们线程数过多,在访问有限的资源时候,可以使用Semaphore来控制线程的数量。

public class SemaphoreDemo implements Runnable {
    Semaphore mSemaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            mSemaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + " done!");
            mSemaphore.release();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        SemaphoreDemo demo = new SemaphoreDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
        executorService.shutdown();
        ;
    }
}
img_1b52c38eb83cba26a5d8a8ab50cff614.gif
Semaphore 运行案例

LockSupport

LockSupport提供了一些静态方法用于阻塞线程,和唤醒线程的功能。
处于park()挂起状态的线程是Waiting状态,park()方法阻塞的线程还支持中断,不抛出中断异常的同时设置中断标志位,然后我们可以通过中断标志位来检查。

public class LockDemo implements Runnable{
    public static Object sObject = new Object();

    @Override
    public void run() {
        synchronized (sObject){
            System.out.println("当前线程名称:" + Thread.currentThread().getName());
            LockSupport.park();

            if (Thread.currentThread().isInterrupted()){
                System.out.println( Thread.currentThread().getName() +  "被中断了");
            }
            System.out.println("执行结束");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        LockDemo demo = new LockDemo();
        Thread t1 = new Thread(demo,"t1");
        Thread t2 = new Thread(demo,"t2");
        t1.start();
        Thread.sleep(3000);
        t2.start();
        t1.interrupt();
        LockSupport.unpark(t2);
    }
}
img_c2f9d6e659ddf66ce9fe446d2f0b455d.gif
LockSupport的demo

BlockingQueue

Java的Queue也是面试中经常提到的知识点,这次因为我们只涉及到并发相关知识,所以只提一些并发相关的Queue,关于Queue的具体分析等后面的数据结构系列的时候再详细解说。

BlockingQueue是Java中的阻塞队列,JDK中提供了7个阻塞队列

  • ArrayBlockingQueue : 数组实现的有界队列,对元素进行FIFO(先进先出)的原则排序。
  • LinkedBlockingQueue: 链表组成的有界队列,长度默认最大值为Integer.MAX_VALUE,元素按FIFO原则排序,性能好于ArrayBlockingQueue。
  • PriorityBlockingQueue:支持优先级的无界阻塞队列。
  • DelayQueue: 支持延迟获取元素的无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列。每一个put操作必须等待take操作,否则不能继续添加元素。
  • LinkedTransferQueue:链表组成的无界传输队列
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。可以从两段插入和移除元素。

带大家看一下LinkedBlockingQueue的几个关键方法:

   //LinkedBlockingQueue 方法探索
  // 添加元素
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //如果队列满了,直接返回false
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        // 创建新的节点
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 如果队列不满的话,就让元素加入队列。
            //然后判断,当前队列元素是否满了,不满的话,通知notFull条件。
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 假如添加的是第一个元素,通知队列不为空了。
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
       
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            // 当队列满的时候进行等待。若不满入队
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
          
           // 同offer
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // 同offer
        if (c == 0)
            signalNotEmpty();
    }

可以看出添加元素上:

  • 当队列满的时候,offer不添加元素,立刻返回。put则会阻塞操作,直到队列为不满。
  • 还有一个带参数的offer方法,和put的唯一区别就是有超时时间,在一段时间内队列还不空的话,就返回。
   //LinkedBlockingQueue 方法探索
  // 移除
    public E poll() {
        final AtomicInteger count = this.count;
        // 队列为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //队列有元素的话,取出元素
            //取出元素后如果队列是不为空,发出不为空的信号。
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        //如果取出元素之前,队列是满的,因为取出了元素,现在发出不满的信号
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
           // 队列为空的话,就等待队列不为空。
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

可以看出LinkedBlockingQueue的移除操作poll和take方法:

  • poll不阻塞,take会阻塞
  • poll(long timeout, TimeUnit unit),当队列为空的时候,等待指定的时间,如果队列扔为空,那么就返回。

这次是带领大家一起看了下LinkedBlockingQueue的关键方法,其它的队列的操作也都类似,望大家自行查看,JDK中Queue的实现并不难理解。

最后

这次主要介绍了几个并发中可能会用到的工具类,最后说了下JDK并发包中的阻塞队列,阻塞队列相对比较重要,就简单的分析了其实现。
希望能帮助到大家。

参考

  • 《Java并发实战》
  • 《Java高并发程序设计》
  • 《并发编程的艺术》
目录
相关文章
|
1月前
|
安全 数据库连接 数据库
连接池的并发和线程安全
连接池的并发和线程安全
|
5天前
|
安全 Java
深入理解 Java 多线程和并发工具类
【4月更文挑战第19天】本文探讨了Java多线程和并发工具类在实现高性能应用程序中的关键作用。通过继承`Thread`或实现`Runnable`创建线程,利用`Executors`管理线程池,以及使用`Semaphore`、`CountDownLatch`和`CyclicBarrier`进行线程同步。保证线程安全、实现线程协作和性能调优(如设置线程池大小、避免不必要同步)是重要环节。理解并恰当运用这些工具能提升程序效率和可靠性。
|
7天前
|
Java 开发者
Java中多线程并发控制的实现与优化
【4月更文挑战第17天】 在现代软件开发中,多线程编程已成为提升应用性能和响应能力的关键手段。特别是在Java语言中,由于其平台无关性和强大的运行时环境,多线程技术的应用尤为广泛。本文将深入探讨Java多线程的并发控制机制,包括基本的同步方法、死锁问题以及高级并发工具如java.util.concurrent包的使用。通过分析多线程环境下的竞态条件、资源争夺和线程协调问题,我们提出了一系列实现和优化策略,旨在帮助开发者构建更加健壮、高效的多线程应用。
7 0
|
11天前
|
Java API 调度
安卓多线程和并发处理:提高应用效率
【4月更文挑战第13天】本文探讨了安卓应用中多线程和并发处理的优化方法,包括使用Thread、AsyncTask、Loader、IntentService、JobScheduler、WorkManager以及线程池。此外,还介绍了RxJava和Kotlin协程作为异步编程工具。理解并恰当运用这些技术能提升应用效率,避免UI卡顿,确保良好用户体验。随着安卓技术发展,更高级的异步处理工具将助力开发者构建高性能应用。
|
23天前
|
安全 Java
Java中的多线程并发控制
在Java中,多线程是实现并发执行任务的一种重要方式。然而,随着多个线程同时访问共享资源,可能会导致数据不一致和其他并发问题。因此,了解并掌握Java中的多线程并发控制机制显得尤为重要。本文将深入探讨Java的多线程并发控制,包括synchronized关键字、Lock接口、Semaphore类以及CountDownLatch类等,并通过实例代码演示其使用方法和注意事项。
12 2
|
30天前
|
算法 安全 Unix
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
30 0
|
1月前
|
负载均衡 Java 数据处理
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(三)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
52 2
|
1月前
|
存储 监控 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(二)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
42 1
|
1月前
|
负载均衡 安全 Java
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用(一)
【C++ 并发 线程池】轻松掌握C++线程池:从底层原理到高级应用
57 2
|
1月前
|
人工智能 缓存 前端开发
ai对话---多线程并发处理问题
ai对话---多线程并发处理问题