开发者

Java CyclicBarrier源码层分析与应用

开发者 https://www.devze.com 2024-01-02 10:17 出处:网络 作者: The-Venus
目录前言CyclicBarrier源码解析以及示例主要成员变量核心方法应用场景任务分解与合并并行计算游戏开发数据加载并发工具的协同CyclicBarrier和CountDownLatch的区别循环性计数器的变化用途构造函数参数总结前言
目录
  • 前言
  • CyclicBarrier源码解析以及示例
    • 主要成员变量
    • 核心方法
  • 应用场景
    • 任务分解与合并
    • 并行计算
    • 游戏开发
    • 数据加载
    • 并发工具的协同
  • CyclicBarrier和CountDownLatch的区别
    • 循环性
    • 计数器的变化
    • 用途
    • 构造函数参数
  • 总结

    前言

    在多线程编程中,同步工具是确保线程之间协同工作的重要组成部分。

    CyclicB编程客栈arrier(循环屏障)是Java中的一个强大的同步工具,它允许一组线程在达到某个共同点之前互相等待。

    在本文中,我们将深入探讨CyclicBarrier的源码实现以及提供一些示例,以帮助您更好地理解和应用这个有趣的同步工具。

    CyclicBarrier源码解析以及示例

    主要成员变量

    public class CyclicBarrier {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition trip = lock.newCondition();
        private final int parties;
        private int count;
        private final Runnable barrierCommand;
    }
    • lock: 用于控制并发访问的重入锁。
    • trip: 条件变量,用于在屏障点上等待。
    • parties: 表示需要等待的线程数。
    • count: 表示当前已经到达屏障点的线程数。
    • barrierCommand: 在所有线程到达屏障点之后执行的命令,可以为null。

    核心方法

    await方法

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            lock.lock();
            if (Thread.interrupted())
                throw new InterruptedException();
            int index = --count;
            if (index == 0) { // 如果是最后一个到达的线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier(); // 执行失败,重置屏障状态
                }
            }
            while (index > 0) {
                try {
                    trip.await();
                } catch (InterruptedException ie) {
                    if (index == 1 && !broken)
                        breakBarrier();
                    throw ie;
                }
            }
            if (broken)
                throw new BrokenBarrierException();
            return index;
        } finally {
            lock.unlock();
        }
    }

    上述代码主要完成以下几个任务:

    • 减小计数器,表示有一个线程到达了屏障点。
    • 如果是最后一个到达的线程,执行屏障命令(如果有),然后唤醒所有等待的线程。
    • 如果不是最后一个到达的线程,进入等待状态,直到被唤醒。
    • 处理中断异常和屏障破坏异常。

    应用场景

    任务分解与合并

    当一个大任务可以分解为多个子任务,每个子任务独立执行,但在某个点上需要等待所有子任务完成后再继续执行父任务。CyclicBarrier可以用来同步这些子任务的执行,确保它们在特定的屏障点上等待,然后一起继续执行。

    • 应用示例

    假设我们有一个大型的数据处理任务,需要将数据分解为若干子任务并行处理,然后在所有子任务完成后进行结果的合并。CyclicBarrier 可以用来同步子任务的执行,确保在所有子任务都完成后再进行合并操作。

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    public class TaskDecompositionAndMergeExample {
        private static final int NUM_SUBTASKS = 3;
        private static final CyclicBarrier barrier = new CyclicBarrier(NUM_SUBTASKS, () -> {
            System.out.println("All subtasks have been completed. Merging results...");
        });
        public static void main(String[] args) {
            for (int i = 0; i < NUM_SUBTASKS; i++) {
                final int subtaskId = i;
                new Thread(() -> {
                    // Perform individual subtask
                    System.out.println("Subtask " + subtaskId + " is processing.");
                    // Simulate some computation for the subtask
                    try {
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Subtask " + subtaskId + " has completed.");
                    try {
                        // Wait for all subtasks to complete
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    并行计算

    在并行计算中,当多个计算节点完成局部计算后,需要将它们的结果合并。CyclicBarrier可以用来等待所有计算节点完成局部计算,然后执行合并操作。

    • 应用示例
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    public class ParallelComputingExample {
        private static final int NUM_THREADS = 4;
        private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> {
            System.out.println("All threads have completed the computation. Merging results...");
        });
        public static void main(String[] args) {
            for (int i = 0; i < NUM_THREADS; i++) {
                final int threadId = i;
                new Thread(() -> {
                    // Perform individual computation
                    System.out.println("Thread " + threadId + " is performing computation.");php
                    // Simulate some computation for the thread
                    try {
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.print编程ln("Thread " + threadId + " has completed computation.");
                    try {
                        // Wait for all threads to complete computation
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    游戏开发

    在多线程游戏开发中,可能存在多个线程分别负责不同的任务,比如渲染、物理模拟、AI计算等。

    在每一帧结束时,这些线程需要同步,确保下一帧开始时所有任务都已完成。CyclicBarrier可以在每一帧结束时等待所有任务完成,然后统一开始下一帧的计算。

    比如我们在打匹配游戏的时候,十个人必须全部加载到100%,才可以开局。否则只要有一个人没有加载到100%,那这个游戏就不能开始。先加载完成的玩家必须等待最后一个玩家加载成功才可以。

    • 应用示例
    public class CyclicBarrierDemo {
        private static CyclicBarrier cyclicBarrier;
        static class CyclicBarrierThread extends Thread{
            @Override
            public void run() {
                System.out.println("玩家 " + Thread.currentThread().getName() + " 加载100%");
                //等待
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args){
            cyclicBarrier = new CyclicBarrier(10, new Runnable() {
                public void run() {
                    System.out.println("玩家都加载好了,开始游戏....");
                }
            });
            for(int i = 0 ; i < 10 ; i++){
                new CyclicBarrierThread().start();
            }
        }
    }
    • 输出结果

    玩家 Thread-0 加载100%

    玩家 Thread-2 加载100%

    玩家 Thread-3 加载100%

    玩家 Thread-6 加载100%

    玩家 Thread-1 加载100%

    玩家 Thread-4 加载100%

    玩家 Thread-5 加载100%

    玩家 Thread-8 加载100%

    玩家 Thread-7 加载100%

    玩家 Thread-9 加载100%

    玩家都加载好了,开始游戏....

    数据加载

    在某些应用中,可能需要同时加载多个数据源,但要确保所有数据加载完成后再继续执行。CyclicBarrier可以用来等待所有数据加载完成,然后执行后续操作。

    • 应用示例
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    public class DataLoaderExample {
        private static final int NUM_THREADS = 3;
        private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> {
            System.out.println("All data loading threads have completed. Initiating further processing...");
        });
        public static void main(String[] args) {
            for (int i = 0; i < NUM_THREADS; i++) {
                final int threadId = i;
                new Thread(() -> {
                    // Simulate data loading
                    System.out.println("Thread " + threadId + " is loading data.");
                    // Simulate data loading time
                    try {
                        Thread.sleep((long) (Math.random() * 1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Thread " + threadId + " has completed data loading.");
                    try {
                        // Wait for all data loajavascriptding threads to complete
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    // Perform further processing after data loading is complete
                    System.out.println("Thread " + threadId + " is performing further processing.");
                }).start();
            }
        }
    }

    并发工具的协同

    CyclicBarrier可以与其他并发工具一起使用,例如 ExecutorServiceCountDownLatch,以实现更复杂的多线程控制逻辑。

    • 应用示例
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    public class CyclicBarrierExample {
        private static final int NUM_THREADS = 3;
        private static final CyclicBarrier barrier = new CyclicBarrier(NUM_THREADS, () -> {
            System.out.println("All threads have reached the barrier. Let's continue!");
        });
        public static void main(String[] args) {
            for (int i = 0; i < NUM_THREADS; i++) {
                new Thread(() -> {
                    try {
                        // Perform individual tasks
                        System.out.println(Thread.currentThread().getName() + " is performing individual tasks.");
                        // Wait for all threads to reach the barrier
                        barrier.await();
                        // Continue with collective tasks after reaching the barrier
                        System.out.println(Thread.currentThread().getName() + " is performing collective tasks.");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

    CyclicBarrier和CountDownLatch的区别

    循环性

    • CyclicBarrier 具有循环的特性,可以被重复使用。一旦所有线程都到达屏障点,它会自动重置并再次等待下一轮。这使得 CyclicBarrier 更适合用于一组线程多次协同工作的场景。
    • CountDownLatch 是一次性的,一旦计数到达零,就无法重新设置。如果需要多次等待,就需要创建新的 CountDownLatch 实例。

    计数器的变化

    • CyclicBarrier 中,计数器的递减是由到达屏障点的线程执行的,而且在所有线程都到达之前,任何线程都不会继续执行。
    • CountDownLatch 中,计数器的递减是由任意线程执行的,而且线程在递减计数器php后可以继续执行,不必等待其他线程。

    用途

    • CyclicBarrier 通常用于一组线程并行执行任务,然后在某个点上等待彼此,然后再一起继续执行下一轮任务。例如,任务分解与合并、并行计算等场景。
    • CountDownLatch 用于等待一组线程完成某个任务后再执行其他任务。例如,主线程等待所有工作线程完成工作后再继续执行。

    构造函数参数

    • CyclicBarrier 的构造函数需要指定参与同步的线程数,以及在屏障点上执行的可选操作(Runnable)。
    • CountDownLatch 的构造函数需要指定计数的初始值。

    总结

    通过本文,我们深入了解了CyclicBarrier的源码实现,并通过一个简单的示例演示了它的用法。

    CyclicBarrier是一个强大的同步工具,可以帮助我们实现复杂的多线程协同任务。

    在多线程编程中,理解和熟练使用这样的同步工具是至关重要的,能够确保线程之间的协同工作更加高效和可靠。

    以上就是Java CyclicBarrier源码层分析与应用的详细内容,更多关于Java CyclicBarrier的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消