美文网首页
Java并发编程之CountDownLatch的使用

Java并发编程之CountDownLatch的使用

作者: polyau | 来源:发表于2020-06-16 01:24 被阅读0次

1. 介绍

在这篇文章中,将给出一个对CountDownLatch的指导性教程,以实际的例子说明其基本使用方法。

本质上,通过CountDownLatch可以使得一个线程一直阻塞直到其它线程已经完成指定的任务。

2. 并发编程中的使用

简单地来说,CountDownLatch有一个counter域,你可以根据需要对它进行递减。我们可以使用它来阻塞一个正在被调用的线程直到它被递减为0。

如果我们正在做一些并行处理工作,可以以目标工作线程数量作为计数器的初值实例化CountDownLatch对象。然后我们就可以在每个线程结束的时候调用countdown()来保证每一个调用了await()的独立线程被阻塞直到所有的工作进程执行结束。

3. 等待线程结束

让我们来尝试下这种模式,通过创建一个Worker,使用CountDownLatch字段来通知我们其何时结束:

public class Worker implements Runnable {
    private List<String> outputScraper;
    private CountDownLatch countDownLatch;
 
    public Worker(List<String> outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }
 
    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

接下来,让我们创建测试用例验证我们可以让CountDownLatch一直等待Worker实例结束:

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {
 
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());
 
      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");
 
      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

自然而然,"Latch released"将会一直是最后的输出 - 因为它取决于CountDownLatch何时释放。

请注意如果没有调用await(),我们将不能保证线程的执行次序,导致测试将会发生随机性的失败。

4. 多个线程等待同时开始运行

之前的例子使用了5个线程,而这次将会启动数千个线程,这样就很有可能出现一种情形:在后面启动的线程中调用start()前,很多早先一些启动的线程已经执行完毕。这有可能使得一个并发问题难以被重现,因为我们不能够让这些线程并行运行。

为了解决这个问题,我们在之前的例子的基础上稍微改变一下CountdownLatch的工作方式。并不是阻塞父线程直到一些子线程执行结束,我们可以阻塞子线程直到其它的子线程都已经开始运行。

让我们修改一下run()方法让它在继续执行前先进行阻塞:

public class WaitingWorker implements Runnable {
 
    private List<String> outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;
 
    public WaitingWorker(
      List<String> outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {
 
        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }
 
    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

现在,让我们修改一下测试用例,这会使得主线程一直阻塞,直到所有Worker线程都开始运行。接着主线程会解禁所有的Worker线程,然后再阻塞直到所有的Worker线程都执行结束:

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
  
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());
 
    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");
 
    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

这个模式对于重现并发bug很有用,因为能够被用来强制数千个线程并行执行相同的逻辑。

5. 提前终止CountdownLatch

有时候,我们可能遇到这样的情形,Worker在递减CountDownLatch之前由于某些错误发生意外的终止。这会导致计数器永远不会递减为0,然后await()永远不会终止:

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

让我们修改之前的测试代码,使用BrokenWorker,这显示await()将会一直阻塞:

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {
  
    List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List<Thread> workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());
 
    workers.forEach(Thread::start);
    countDownLatch.await();
}

显而易见,这并不是我们所期望的 - 如果能让程序继续运行会比永远地陷入阻塞好得多。

为了解决这个问题,我们添加一个超时参数去调用await()

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

如我们所见,测试代码最终将会超时,await()将会返回false

6. 总结

在这篇快速指导文章中,我们说明了如何使用CountDownLatch去阻塞一个线程直到其它线程已经完成其任务。

我们也展示了它如何被用来帮助调试并发问题(通过保证线程并行运行)。

这些例子的实现代码可以在over on GitHub上找到; 这是一个基于Maven的项目,因此应该很容易按原样运行。

翻译自https://www.baeldung.com/java-countdown-latch
,如有错误欢迎指正!

相关文章

网友评论

      本文标题:Java并发编程之CountDownLatch的使用

      本文链接:https://www.haomeiwen.com/subject/wlikxktx.html