apache-spark-sql- все статьи тега
Как я могу распараллелить цикл for в spark с scala?
Например, у нас есть файл parquet с ценой закрытия 2000 биржевых символов за последние 3 года, и мы хотим рассчитать 5-дневную скользящую среднюю для каждого символа. Поэтому я создаю spark SQLContext, а затем val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache() Чтобы получить список символов, val symbols = marketData.select("SYMBOL").distinct().collect() А вот цикл for: for (symbol <- symbols) { marketData.filter(symbol).rdd.sliding( ...
Производительность программы Spark-GC & десериализация задач и параллельное выполнение
У меня есть кластер из 4 машин, 1 Мастер и три рабочих, каждый с памятью 128 г и 64 ядрами. Я использую Spark 1.5.0 в автономном режиме. Моя программа считывает данные из таблиц Oracle с помощью JDBC, затем делает ETL, манипулируя данными, и выполняет задачи машинного обучения, такие как k-means. У меня есть фрейм данных (myDF.cache ()), который объединяет результаты с двумя другими фреймами данных и кэшируется. Фрейм данных содержит 27 миллионов строк, а размер данных составляет около 1,5 G. ...
Как избежать того, чтобы Spark executor потерялся и контейнер пряжи убил его из-за ограничения памяти?
У меня есть следующий код, который срабатывает hiveContext.sql() большую часть времени. Моя задача состоит в том, чтобы создать несколько таблиц и вставить значения в после обработки для всех разделов таблицы hive. Поэтому я сначала запускаю show partitions и, используя его выход в цикле for, вызываю несколько методов, которые создают таблицу (если она не существует) и вставляют в них с помощью hiveContext.sql. Теперь мы не можем выполнить hiveContext в исполнителе, поэтому я должен выполнит ...
Кэширование временных таблиц с помощью spark-sql
Является таблицей, зарегистрированной с помощью registerTempTable (createOrReplaceTempView с искрой 2.+) кэшируется? Используя Zeppelin, я регистрирую DataFrame в своем коде scala, после тяжелых вычислений, а затем в %pyspark я хочу получить к нему доступ и далее фильтровать его. Будет ли он использовать кэшированную в памяти версию таблицы? Или он будет перестраиваться каждый раз? ...
Преобразования эпохи на сегодняшний день в Elasticsearch СПАРК
У меня есть фрейм данных, который я записываю в ES Прежде чем писать в ES, я преобразую столбец EVTExit в дату, которая находится в эпохе. workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000)))) workset.select("EVTExit").show(10) +----------+ | EVTExit| +----------+ |2014-06-03| |null | |2012-10-23| |2014-06-03| |2015-11-05| Как я вижу, это EVTExit преобразуется в дату. workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1") Но ...
Spark-загрузить CSV-файл в качестве фрейма данных?
Я хотел бы прочитать CSV в spark и преобразовать его в виде фрейма данных и сохранить его в HDFS с df.registerTempTable("table_name") Я пробовал: scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") ошибка, которую я получил: java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.sp ...
Как я могу добавить новый столбец к таблице данных СПАРК (используя PySpark)?
У меня есть Spark DataFrame (используя PySpark 1.5.1) и хотел бы добавить новый столбец. Я пробовал следующее без всякого успеха: type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) также получил ошибку, используя это: my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) Итак, как доб ...
Извлечение значений столбцов фрейма данных в виде списка в Apache Spark
Я хотел бы преобразовать строковый столбец фрейма данных в список. Что я могу найти от Dataframe API-это RDD, поэтому я попытался сначала преобразовать его обратно в RDD, а затем применить toArray функция для RDD. В этом случае длина и SQL работают просто отлично. Однако результат, который я получил от RDD, имеет квадратные скобки вокруг каждого элемента, подобного этому [A00001]. Мне было интересно, если есть способ преобразовать столбец в список или способ снять квадратные скобки. любой предл ...