Как я могу распараллелить цикл for в spark с scala?
Например, у нас есть файл parquet с ценой закрытия 2000 биржевых символов за последние 3 года, и мы хотим рассчитать 5-дневную скользящую среднюю для каждого символа.
Поэтому я создаю spark SQLContext, а затем
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
Чтобы получить список символов,
val symbols = marketData.select("SYMBOL").distinct().collect()
А вот цикл for:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
Очевидно, что выполнение цикла for на spark является медленным, и save() для каждого малого результата также замедляет процесс (я попытался определить var result вне цикла for и объединить все вывод, чтобы сделать операцию ввода-вывода вместе, но я получил исключение stackoverflow), так как я могу распараллелить цикл for и оптимизировать операцию ввода-вывода?
2 ответов:
Программа, которую вы пишете, выполняется в узле Spark драйвера ("master"). Выражения в этой программе могут быть распараллелены только в том случае, если вы работаете с параллельными структурами (RDDs).
Попробуйте это:
marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() }Где
symbolizeберет строку символа x day и возвращает кортеж (символ, день).
Что касается первой части ответа, то я не согласен с Карлосом. Программа не запускается в драйвере ("master").
Цикл выполняется последовательно, но для каждого символа выполняется:
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()Выполняется параллельно, так как
markedDataявляется фреймом данных Spark и он распределен.
Comments