Kotlin coroutines-использовать основной поток в блокировке запуска



Я пытаюсь выполнить следующий код:



 val jobs = listOf(...)
return runBlocking(CommonPool) {
val executed = jobs.map {
async { it.execute() }
}.toTypedArray()
awaitAll(*executed)
}


Где jobs - Список некоторых Supplier s-в мире synchronus это должно просто создать, например, список ints.
Все работает нормально, но проблема в том, что основной поток не используется. Ниже скриншот из YourKit:
Введите описание изображения здесь



Итак, вопрос в том, как я могу также использовать основной поток?



Я предполагаю, что runBlocking здесь проблема, но есть ли другой способ получить тот же результат? С параллельным потоком Java он выглядит намного лучше, но основной поток все еще не используется полностью (задачи полностью независимы).



Обновление



Хорошо, может быть, я сказал вам слишком мало вещей.
Мои вопросы пришли через некоторое время после просмотра презентации Ванканта Субраманьяма: https://youtu.be/0hQvWIdwnw4 .
Мне нужна максимальная производительность, нет IO, нет Ui и т. д. Только вычисления. Есть только запрос, и мне нужно использовать все мои доступные ресурсы.

Одна мысль, которая у меня есть, - это установить параллелизм к потоку count + 1, но я думаю, что это довольно глупо.

722   4  

4 ответов:

Я протестировал решение с параллельными потоками Java 8:

jobs.parallelStream().forEach { it.execute() }

Я обнаружил, что загрузка процессора была надежно на 100%. Для справки я использовал следующее вычислительное Задание:

class MyJob {
    fun execute(): Double {
        val rnd = ThreadLocalRandom.current()
        var d = 1.0
        (1..rnd.nextInt(100_000_000)).forEach {
            d *= 1 + rnd.nextDouble(0.0000001)
        }
        return d
    }
}
Обратите внимание, что его длительность случайным образом изменяется от нуля до времени, необходимого для выполнения 100 000 000 умножения FP. Из любопытства я также изучил код, который вы добавили к своему вопросу, как решение, которое работает для вас. Я нашел ряд проблем с ним, таких как:
  • накопление все результаты в список вместо того, чтобы обрабатывать их по мере их поступления
  • закрытие канала результатов сразу после отправки последнего задания вместо ожидания всех результатов

Я написал свой собственный код и добавил его, чтобы сравнить Stream API one-liner с ним. Вот оно:

const val NUM_JOBS = 10_000
val jobs = (0 until NUM_JOBS).map { MyJob() }

fun parallelStream(): Double =
        jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })

fun channels(): Double {
    val resultChannel = Channel<Double>(UNLIMITED)

    val mainComputeChannel = Channel<MyJob>()
    val poolComputeChannels = (1..commonPool().parallelism).map {
        actor<MyJob>(CommonPool) {
            for (job in channel) {
                job.execute().also { resultChannel.send(it) }
            }
        }
    }
    val allComputeChannels = poolComputeChannels + mainComputeChannel

    // Launch a coroutine that submits the jobs
    launch(CommonPool) {
        jobs.forEach { job ->
            select {
                allComputeChannels.forEach { chan ->
                    chan.onSend(job) {}
                }
            }
        }
    }

    // Run the main loop which takes turns between running a job
    // submitted to the main thread channel and receiving a result
    return runBlocking {
        var completedCount = 0
        var sum = 0.0
        while (completedCount < NUM_JOBS) {
            select<Unit> {
                mainComputeChannel.onReceive { job ->
                    job.execute().also { resultChannel.send(it) }
                }
                resultChannel.onReceive { result ->
                    sum += result
                    completedCount++
                }
            }
        }
        sum
    }
}

fun main(args: Array<String>) {
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
    measure("Parallel Stream", ::parallelStream)
    measure("Channels", ::channels)
}

fun measure(task: String, measuredCode: () -> Double) {
    val block = { print(measuredCode().toString().substringBefore('.')) }
    println("Warming up $task")
    (1..20).forEach { block() }
    println("\nMeasuring $task")
    val average = (1..20).map { measureTimeMillis(block) }.average()
    println("\n$task took $average ms")
}

Вот мой типичный результат:

Parallel Stream took 396.85 ms
Channels took 398.1 ms

Результаты похожи, но одна строка кода все равно превосходит 50 строк кода:)

Во-первых, я хотел бы сопереживать тому, что использование основного потока обычно не служит никакой практической цели.

Если ваше приложение полностью асинхронно, то у вас будет заблокирован только один (основной) поток. Этот поток действительно потребляет некоторую память и добавляет немного дополнительного давления на планировщик, но дополнительное влияние на производительность будет незначительным и даже невозможно измерить.

В практическом мире java практически невозможно иметь фиксированное количество потоков в JVM. Есть системные потоки (gc), есть потоки nio и т. д.

Одна нить не имеет значения. Вы в порядке, пока количество потоков в вашем приложении не растет беспрепятственно с увеличением нагрузки.


Вернемся к первоначальному вопросу.

Я не думаю, что есть краткий способ использовать основной поток в такого рода задачах параллельной обработки.

Например, вы можете сделать следующее:

data class Job(val res: Int) {
    fun execute(): Int {
        Thread.sleep(100)
        println("execute $res in ${Thread.currentThread().name}")
        return res
    }
}

fun main() {
    val jobs = (1..100).map { Job(it) }
    val resultChannel = Channel<Int>(Channel.UNLIMITED)
    val mainInputChannel = Channel<Job>()

    val workers = (1..10).map {
        actor<Job>(CommonPool) {
            for (j in channel) {
                resultChannel.send(j.execute())
            }
        }
    }

    val res: Deferred<List<Int>> = async(CommonPool) {
        val allChannels = (listOf(mainInputChannel) + workers)

        jobs.forEach { job ->
            select {
                allChannels.forEach {
                    it.onSend(job) {}
                }
            }
        }

        allChannels.forEach { it.close() }
        (1..jobs.size).map { resultChannel.receive() }
    }

    runBlocking {
        for (j in mainInputChannel) {
            resultChannel.send(j.execute())
        }
    }

    runBlocking {
        res.await().forEach { println(it) }
    }
}

В основном, это простой производитель / потребитель реализация, где основной поток выступает в качестве одного из потребителей. Но это приводит к большому количеству шаблонных тем не менее.

Вывод:

execute 1 in main @coroutine#12
execute 5 in ForkJoinPool.commonPool-worker-1 @coroutine#4
execute 6 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 7 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 2 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 8 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 4 in ForkJoinPool.commonPool-worker-5 @coroutine#3
execute 3 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 12 in main @coroutine#12
execute 10 in ForkJoinPool.commonPool-worker-7 @coroutine#9
execute 15 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 11 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 16 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 9 in ForkJoinPool.commonPool-worker-1 @coroutine#8
execute 14 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 13 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 20 in main @coroutine#12
execute 17 in ForkJoinPool.commonPool-worker-5 @coroutine#2
execute 18 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 24 in ForkJoinPool.commonPool-worker-1 @coroutine#6
execute 23 in ForkJoinPool.commonPool-worker-4 @coroutine#5
execute 22 in ForkJoinPool.commonPool-worker-2 @coroutine#4
execute 19 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 21 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 25 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 28 in main @coroutine#12
execute 29 in ForkJoinPool.commonPool-worker-2 @coroutine#2
execute 30 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 27 in ForkJoinPool.commonPool-worker-4 @coroutine#10
execute 26 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 32 in ForkJoinPool.commonPool-worker-3 @coroutine#4
execute 31 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 36 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 35 in ForkJoinPool.commonPool-worker-4 @coroutine#7
execute 33 in ForkJoinPool.commonPool-worker-2 @coroutine#5
execute 38 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 37 in main @coroutine#12
execute 34 in ForkJoinPool.commonPool-worker-7 @coroutine#6
execute 39 in ForkJoinPool.commonPool-worker-6 @coroutine#3
execute 40 in ForkJoinPool.commonPool-worker-1 @coroutine#1
execute 44 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 41 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 46 in ForkJoinPool.commonPool-worker-1 @coroutine#2
execute 47 in ForkJoinPool.commonPool-worker-6 @coroutine#1
execute 45 in main @coroutine#12
execute 42 in ForkJoinPool.commonPool-worker-2 @coroutine#9
execute 43 in ForkJoinPool.commonPool-worker-7 @coroutine#10
execute 48 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 52 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 49 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 54 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 53 in main @coroutine#12
execute 50 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 51 in ForkJoinPool.commonPool-worker-6 @coroutine#7
execute 56 in ForkJoinPool.commonPool-worker-3 @coroutine#3
execute 55 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 60 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 61 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 57 in ForkJoinPool.commonPool-worker-4 @coroutine#4
execute 59 in ForkJoinPool.commonPool-worker-3 @coroutine#10
execute 64 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 58 in ForkJoinPool.commonPool-worker-6 @coroutine#9
execute 62 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 63 in main @coroutine#12
execute 68 in ForkJoinPool.commonPool-worker-5 @coroutine#8
execute 65 in ForkJoinPool.commonPool-worker-1 @coroutine#3
execute 66 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 67 in ForkJoinPool.commonPool-worker-7 @coroutine#7
execute 69 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 70 in ForkJoinPool.commonPool-worker-3 @coroutine#2
execute 74 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 75 in main @coroutine#12
execute 71 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 76 in ForkJoinPool.commonPool-worker-7 @coroutine#3
execute 73 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 78 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 72 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 77 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 79 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 83 in main @coroutine#12
execute 84 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 85 in ForkJoinPool.commonPool-worker-5 @coroutine#5
execute 82 in ForkJoinPool.commonPool-worker-1 @coroutine#7
execute 81 in ForkJoinPool.commonPool-worker-6 @coroutine#4
execute 80 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 89 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 90 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 91 in main @coroutine#12
execute 86 in ForkJoinPool.commonPool-worker-5 @coroutine#6
execute 88 in ForkJoinPool.commonPool-worker-6 @coroutine#10
execute 87 in ForkJoinPool.commonPool-worker-1 @coroutine#9
execute 92 in ForkJoinPool.commonPool-worker-7 @coroutine#2
execute 93 in ForkJoinPool.commonPool-worker-4 @coroutine#3
execute 99 in main @coroutine#12
execute 97 in ForkJoinPool.commonPool-worker-3 @coroutine#8
execute 98 in ForkJoinPool.commonPool-worker-2 @coroutine#1
execute 95 in ForkJoinPool.commonPool-worker-1 @coroutine#5
execute 100 in ForkJoinPool.commonPool-worker-4 @coroutine#6
execute 94 in ForkJoinPool.commonPool-worker-5 @coroutine#4
execute 96 in ForkJoinPool.commonPool-worker-7 @coroutine#7
1
5
6
7
2
8
4
3
12
10
15
11
16
9
14
13
20
17
18
24
23
22
19
21
25
28
29
30
27
26
32
31
36
35
33
38
37
34
39
40
44
41
46
47
45
42
43
48
52
49
54
53
50
51
56
55
60
61
57
59
64
58
62
63
68
65
66
67
69
70
74
75
71
76
73
78
72
77
79
83
84
85
82
81
80
89
90
91
86
88
87
92
93
99
97
98
95
100
94
96

Только потому, что в этом явном потоке не выполняется никакая работа, не означает, что устройство не выполняет другие потоки на том же ядре.

На самом деле лучше, чтобы ваш MainThread бездействовал, что сделает ваш пользовательский интерфейс более отзывчивым.

Async () без каких-либо параметров использует DefaultDispatcher и будет принимать пул от родителя, поэтому все асинхронные вызовы выполняются в CommonPool. Если вам нужен другой набор потоков для выполнения кода, создайте свой собственный пул. В то время как это обычно хорошая практика, чтобы не использовать основной поток с вычислениями, но зависит от вашего usecase.

Comments

    Ничего не найдено.