По умолчанию ForkJoinPool executor занимает много времени



Я работаю с CompletableFuture для асинхронного выполнения потока, генерируемого из источника списка.



Таким образом, я тестирую перегруженный метод, т. е. "supplyAsync" CompletableFuture, в котором один метод принимает только один параметр поставщика, а другой-параметр поставщика и параметр исполнителя.
Вот документация для обоих:

Один




SupplyAsync (поставщик поставщик)



Возвращает новый CompletableFuture, который асинхронно завершены задачи, работающей в ForkJoinPool.commonPool() со значением, полученным при вызове данного поставщика.




Второй




SupplyAsync (поставщик поставщик, исполнитель исполнитель)



Возвращает новую функцию CompletableFuture, асинхронно выполняемую задачей, запущенной в данном исполнителе, со значением, полученным при вызове данного поставщика.




А вот мой тестовый класс:



public class TestCompleteableAndParallelStream {

public static void main(String[] args) {
List<MyTask> tasks = IntStream.range(0, 10)
.mapToObj(i -> new MyTask(1))
.collect(Collectors.toList());

useCompletableFuture(tasks);

useCompletableFutureWithExecutor(tasks);

}

public static void useCompletableFutureWithExecutor(List<MyTask> tasks) {
long start = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor))
.collect(Collectors.toList());

List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millisn", tasks.size(), duration);
System.out.println(result);
executor.shutdown();
}

public static void useCompletableFuture(List<MyTask> tasks) {
long start = System.nanoTime();
List<CompletableFuture<Integer>> futures =
tasks.stream()
.map(t -> CompletableFuture.supplyAsync(() -> t.calculate()))
.collect(Collectors.toList());

List<Integer> result =
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.printf("Processed %d tasks in %d millisn", tasks.size(), duration);
System.out.println(result);
}



}


class MyTask {
private final int duration;
public MyTask(int duration) {
this.duration = duration;
}
public int calculate() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(duration * 1000);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
return duration;
}
}


В то время как метод "useCompletableFuture" принимает около 4 секунд для завершения, метод "useCompletableFutureWithExecutor" занимает всего 1 секунду для завершения.



Нет, мой вопрос в том, какая разная обработка делает ForkJoinPool.commonPool (), который мог бы сделать накладные расходы? В том, что мы не всегда предпочитают выполненный исполнителем бассейн за ForkJoinPool?

622   2  

2 ответов:

Проверьте размер ForkJoinPool.commonPool(). По умолчанию он создает пул размером

Runtime.getRuntime().availableProcessors() - 1

Я запускаю ваш пример на моем Intel i7-4800MQ (4 ядра + 4 виртуальных ядра) и размер общего пула в моем случае равен 7, поэтому все вычисления заняли ~2000 МС:

ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-5
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-7
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-1
Processed 10 tasks in 2005 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Во втором случае вы использовали

Executors.newFixedThreadPool(Math.min(tasks.size(), 10));

Таким образом, пул имеет 10 потоков, готовых к вычислению, поэтому все задачи выполняются за ~1000 мс:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
Processed 10 tasks in 1002 millis
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Разница между ForkJoinPool и ExecutorService

Евгений в своем комментарий упомянул также еще одну важную вещь. ForkJoinPool использует подход кражи работы:

AForkJoinPool отличается от других видов ExecutorService главным образом тем, что использует воровство работы: все потоки в пуле пытаются найти и выполнить задачи, переданные в пул и/или созданные другими активными задачами (в конечном итоге блокируя ожидание работы, если таковой не существует). Это дает возможность эффективной обработки, когда большинство задач наплодить других подзадач (как и большинство ForkJoinTasks), а а также когда в пул поступает много мелких задач от внешних клиентов. Особенно при установке asyncMode в true в конструкторах, ForkJoinPools также может быть подходящим для использования с задачами в стиле событий, которые никогда не соединяются.

В то время как ExecutorService создано с .newFixedThreadPool() использует подход "разделяй и властвуй".

Как определить размер пула?

Возник вопрос о том, какой размер пула потоков лучше, возможно, вы найдете полезную информацию там:

Установка идеального размера пула потоков

Также этот поток является хорошим местом для исследования:

Пользовательский пул потоков в параллельном потоке Java 8

Далее проверяя решения в Интернете, я обнаружил, что мы можем изменить размер пула по умолчанию, который принимает ForkJoinPool, используя следующие свойства:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

Таким образом, это свойство может дополнительно помочь в создании ForkJoinPool, который будет использоваться более эффективным образом и с большим параллелизмом.

Comments

    Ничего не найдено.