По умолчанию ForkJoinPool executor занимает много времени
Я работаю с CompletableFuture для асинхронного выполнения потока, генерируемого из источника списка.
Таким образом, я тестирую перегруженный метод, т. е. "supplyAsync" CompletableFuture, в котором один метод принимает только один параметр поставщика, а другой-параметр поставщика и параметр исполнителя.
Вот документация для обоих:
Один
SupplyAsync (поставщик поставщик)
Возвращает новый CompletableFuture, который асинхронно завершены задачи, работающей в ForkJoinPool.commonPool() со значением, полученным при вызове данного поставщика.
Второй
SupplyAsync (поставщик поставщик, исполнитель исполнитель)
Возвращает новую функцию CompletableFuture, асинхронно выполняемую задачей, запущенной в данном исполнителе, со значением, полученным при вызове данного поставщика.
А вот мой тестовый класс:
public class TestCompleteableAndParallelStream {
public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());
useCompletableFuture(tasks);
useCompletableFutureWithExecutor(tasks);
}
public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millisn", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}
public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());
List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millisn", tasks.size(), duration);
System.out.println(result);
}
}
class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}
В то время как метод "useCompletableFuture" принимает около 4 секунд для завершения, метод "useCompletableFutureWithExecutor" занимает всего 1 секунду для завершения.
Нет, мой вопрос в том, какая разная обработка делает ForkJoinPool.commonPool (), который мог бы сделать накладные расходы? В том, что мы не всегда предпочитают выполненный исполнителем бассейн за ForkJoinPool?
2 ответов:
Проверьте размер
ForkJoinPool.commonPool(). По умолчанию он создает пул размеромRuntime.getRuntime().availableProcessors() - 1Я запускаю ваш пример на моем Intel i7-4800MQ (4 ядра + 4 виртуальных ядра) и размер общего пула в моем случае равен
7, поэтому все вычисления заняли ~2000 МС:ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-4 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-6 ForkJoinPool.commonPool-worker-5 ForkJoinPool.commonPool-worker-3 ForkJoinPool.commonPool-worker-7 ForkJoinPool.commonPool-worker-4 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-1 Processed 10 tasks in 2005 millis [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]Во втором случае вы использовали
Executors.newFixedThreadPool(Math.min(tasks.size(), 10));Таким образом, пул имеет 10 потоков, готовых к вычислению, поэтому все задачи выполняются за ~1000 мс:
pool-1-thread-1 pool-1-thread-2 pool-1-thread-3 pool-1-thread-4 pool-1-thread-5 pool-1-thread-6 pool-1-thread-7 pool-1-thread-8 pool-1-thread-9 pool-1-thread-10 Processed 10 tasks in 1002 millis [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]Разница между
ForkJoinPoolиExecutorServiceЕвгений в своем комментарий упомянул также еще одну важную вещь.
ForkJoinPoolиспользует подход кражи работы:AForkJoinPool отличается от других видов ExecutorService главным образом тем, что использует воровство работы: все потоки в пуле пытаются найти и выполнить задачи, переданные в пул и/или созданные другими активными задачами (в конечном итоге блокируя ожидание работы, если таковой не существует). Это дает возможность эффективной обработки, когда большинство задач наплодить других подзадач (как и большинство ForkJoinTasks), а а также когда в пул поступает много мелких задач от внешних клиентов. Особенно при установке asyncMode в true в конструкторах, ForkJoinPools также может быть подходящим для использования с задачами в стиле событий, которые никогда не соединяются.
В то время как
ExecutorServiceсоздано с.newFixedThreadPool()использует подход "разделяй и властвуй".Как определить размер пула?
Возник вопрос о том, какой размер пула потоков лучше, возможно, вы найдете полезную информацию там:
Также этот поток является хорошим местом для исследования:
Далее проверяя решения в Интернете, я обнаружил, что мы можем изменить размер пула по умолчанию, который принимает ForkJoinPool, используя следующие свойства:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=16Таким образом, это свойство может дополнительно помочь в создании ForkJoinPool, который будет использоваться более эффективным образом и с большим параллелизмом.
Comments