Как я могу распараллелить цикл for в spark с scala?



Например, у нас есть файл parquet с ценой закрытия 2000 биржевых символов за последние 3 года, и мы хотим рассчитать 5-дневную скользящую среднюю для каждого символа.



Поэтому я создаю spark SQLContext, а затем



val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()


Чтобы получить список символов,



val symbols = marketData.select("SYMBOL").distinct().collect()


А вот цикл for:



for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}


Очевидно, что выполнение цикла for на spark является медленным, и save() для каждого малого результата также замедляет процесс (я попытался определить var result вне цикла for и объединить все вывод, чтобы сделать операцию ввода-вывода вместе, но я получил исключение stackoverflow), так как я могу распараллелить цикл for и оптимизировать операцию ввода-вывода?

849   2  

2 ответов:

Программа, которую вы пишете, выполняется в узле Spark драйвера ("master"). Выражения в этой программе могут быть распараллелены только в том случае, если вы работаете с параллельными структурами (RDDs).

Попробуйте это:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg)  }.foreach{ case (symbol,averages) => averages.save() }

Где symbolize берет строку символа x day и возвращает кортеж (символ, день).

Что касается первой части ответа, то я не согласен с Карлосом. Программа не запускается в драйвере ("master").

Цикл выполняется последовательно, но для каждого символа выполняется:

marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()

Выполняется параллельно, так как markedData является фреймом данных Spark и он распределен.

Comments

    Ничего не найдено.