引言:
提到Java8,我们不得不说的就是Lambda表达式和Stream API。而在Java8中,对于并行流和串行流同样做了大量的优化。对于并行流和串行流的知识,也是在面试过程中,经常被问到的知识点。当然,我们不能只是为了应付面试来学习这些知识,更重要的是将这些知识运用到实际的工作中,更好的提高我们的工作效率和工作质量。
什么是并行流?
简单来说,并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。 Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与顺序流之间进行切换 。
Fork/Join 框架
Fork/Join 框架: 就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总 。
Fork/Join 框架与传统线程池有啥区别?
采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架的实现中,如果某个子任务由于等待另外一个子任务的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子任务来执行。这种方式减少了线程的等待时间,提高了程序的性能。
Fork/Join框架实例
了解了ForJoin框架的原理之后,我们就来手动写一个使用Fork/Join框架实现累加和的示例程序,以帮助读者更好的理解Fork/Join框架。好了,不废话了,上代码,大家通过下面的代码好好体会下Fork/Join框架的强大。
package io.binghe.concurrency.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
Java8中的并行流实例
Java8对并行流进行了大量的优化,并且在开发上也极大的简化了程序员的工作量,我们只需要使用类似如下的代码就可以使用Java8中的并行流来处理我们的数据。
LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);
在Java8中如何优雅的切换并行流和串行流呢?
Stream API 可以声明性地通过 parallel() 与sequential() 在并行流与串行流之间进行切换 。