您好,登錄后才能下訂單哦!
本篇內容介紹了“Java并行處理的實現方法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1. 背景
2.知識
3. Java 中的并行處理
4. 擴展
線程池方式實現并行處理
使用 fork/join框架
5.參考:
本文是一個短文章,介紹Java 中的并行處理。
說明:10多分鐘讀完的文章我稱之為短文章,適合快速閱讀。
并行計算(parallel computing)一般是指許多指令得以同時進行的計算模式。在同時進行的前提下,可以將計算的過程分解成小部分,之后以并發方式來加以解決。
也就是分解為幾個過程:
1、將一個大任務拆分成多個子任務,子任務還可以繼續拆分。
2、各個子任務同時進行運算執行。
3、在執行完畢后,可能會有個 " 歸納 " 的任務,比如 求和,求平均等。
再簡化一點的理解就是: 先拆分 --> 在同時進行計算 --> 最后“歸納”
為什么要“并行”,優點呢?
1、為了獲得 “節省時間”,“快”。適合用于大規模運算的場景。從理論上講,在 n 個并行處理的執行速度可能會是在單一處理機上執行的速度的 n 倍。
2、以前的計算機是單核的,現代的計算機Cpu都是多核的,服務器甚至都是多Cpu的,并行計算可以充分利用硬件的性能。
JDK 8 新增的Stream API(java.util.stream)將生成環境的函數式編程引入了Java庫中,可以方便開發者能夠寫出更加有效、更加簡潔的代碼。
steam 的另一個價值是創造性地支持并行處理(parallel processing)。示例:
final Collection< Task > tasks = Arrays.asList( new Task( Status.OPEN, 5 ), new Task( Status.OPEN, 13 ), new Task( Status.CLOSED, 8 ) ); // 并行執行多個任務,并 求和 final double totalPoints = tasks .stream() .parallel() .map( task -> task.getPoints() ) // or map( Task::getPoints ) .reduce( 0, Integer::sum ); System.out.println( "Total points (all tasks): " + totalPoints );
對于上面的tasks集合,上面的代碼計算所有任務的點數之和。
它使用 parallel 方法并行處理所有的task,并使用 reduce 方法計算最終的結果。
jdk1.5引入了并發包,其中包括了ThreadPoolExecutor,相關代碼如下:
public class ExecutorServiceTest { public static final int THRESHOLD = 10_000; public static long[] numbers; public static void main(String[] args) throws Exception { numbers = LongStream.rangeClosed(1, 10_000_000).toArray(); ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor); int taskSize = (int) (numbers.length / THRESHOLD); for (int i = 1; i <= taskSize; i++) { final int key = i; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { return sum((key - 1) * THRESHOLD, key * THRESHOLD); } }); } long sumValue = 0; for (int i = 0; i < taskSize; i++) { sumValue += completionService.take().get(); } // 所有任務已經完成,關閉線程池 System.out.println("sumValue = " + sumValue); executor.shutdown(); } private static long sum(int start, int end) { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
分支/合并框架的目的是以遞歸的方式將可以并行的認為拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果;相關代碼如下:
public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> { private static final long serialVersionUID = 1L; private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinTest(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinTest(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2); leftTask.fork(); ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); // 注:join方法會阻塞,因此有必要在兩個子任務的計算都開始之后才執行join方法 Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { System.out.println(forkJoinSum(10_000_000)); } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinTest(numbers); return new ForkJoinPool().invoke(task); } }
上面的代碼實現了 遞歸方式拆分子任務,并放入到線程池中執行。
https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97
“Java并行處理的實現方法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。