Как я могу добавить новый столбец к таблице данных СПАРК (используя PySpark)?
У меня есть Spark DataFrame (используя PySpark 1.5.1) и хотел бы добавить новый столбец.
Я пробовал следующее без всякого успеха:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
также получил ошибку, используя это:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Итак, как добавить новый столбец (на основе вектора Python) в существующий фрейм данных с помощью PySpark?
6 ответов:
вы не можете добавить произвольную колонку в
DataFrameв Искра. Новые столбцы могут быть созданы только с помощью литералов (другие типы литералов описаны в как добавить постоянный столбец в фрейм данных Spark?)from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) df_with_x4 = df.withColumn("x4", lit(0)) df_with_x4.show() ## +---+---+-----+---+ ## | x1| x2| x3| x4| ## +---+---+-----+---+ ## | 1| a| 23.0| 0| ## | 3| B|-23.0| 0| ## +---+---+-----+---+преобразование существующего столбца:
from pyspark.sql.functions import exp df_with_x5 = df_with_x4.withColumn("x5", exp("x3")) df_with_x5.show() ## +---+---+-----+---+--------------------+ ## | x1| x2| x3| x4| x5| ## +---+---+-----+---+--------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| ## | 3| B|-23.0| 0|1.026187963170189...| ## +---+---+-----+---+--------------------+С помощью
join:from pyspark.sql.functions import exp lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v")) df_with_x6 = (df_with_x5 .join(lookup, col("x1") == col("k"), "leftouter") .drop("k") .withColumnRenamed("v", "x6")) ## +---+---+-----+---+--------------------+----+ ## | x1| x2| x3| x4| x5| x6| ## +---+---+-----+---+--------------------+----+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo| ## | 3| B|-23.0| 0|1.026187963170189...|null| ## +---+---+-----+---+--------------------+----+или генерируется с помощью функции / udf:
from pyspark.sql.functions import rand df_with_x7 = df_with_x6.withColumn("x7", rand()) df_with_x7.show() ## +---+---+-----+---+--------------------+----+-------------------+ ## | x1| x2| x3| x4| x5| x6| x7| ## +---+---+-----+---+--------------------+----+-------------------+ ## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617| ## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873| ## +---+---+-----+---+--------------------+----+-------------------+производительность, встроенные функции (
pyspark.sql.functions), к которому относится карта Выражение Catalyst, как правило, предпочтительнее, чем пользовательские функции Python.если вы хотите добавить содержимое произвольного RDD в качестве столбца, вы можете
- добавить номера строк в существующий фрейм данных
- вызов
zipWithIndexна RDD и преобразовать его в фрейм данных- соедините оба с помощью индекса в качестве ключа соединения
чтобы добавить столбец с помощью UDF:
df = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3")) from pyspark.sql.functions import udf from pyspark.sql.types import * def valueToCategory(value): if value == 1: return 'cat1' elif value == 2: return 'cat2' ... else: return 'n/a' # NOTE: it seems that calls to udf() must be after SparkContext() is called udfValueToCategory = udf(valueToCategory, StringType()) df_with_cat = df.withColumn("category", udfValueToCategory("x1")) df_with_cat.show() ## +---+---+-----+---------+ ## | x1| x2| x3| category| ## +---+---+-----+---------+ ## | 1| a| 23.0| cat1| ## | 3| B|-23.0| n/a| ## +---+---+-----+---------+
на СПАРК 2.0
# assumes schema has 'age' column df.select('*', (df.age + 10).alias('agePlusTen'))
Я хотел бы предложить обобщенный пример очень похожий пример:
случай использования: у меня есть csv, состоящий из:
First|Third|Fifth data|data|data data|data|data ...billion more linesмне нужно выполнить некоторые преобразования и окончательный csv должен выглядеть
First|Second|Third|Fourth|Fifth data|null|data|null|data data|null|data|null|data ...billion more linesмне нужно сделать это, потому что это схема, определенная некоторой моделью, и мне нужно, чтобы мои окончательные данные были совместимы с массовыми вставками SQL и такими вещами.
так:
1) я читаю оригинальный csv с помощью искра.прочитайте и назовите его "df".
2) я делаю что-то с данными.
3) я добавляю пустые столбцы с помощью этого скрипта:
outcols = [] for column in MY_COLUMN_LIST: if column in df.columns: outcols.append(column) else: outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column))) df = df.select(outcols)таким образом, вы можете структурировать свою схему после загрузки csv (также будет работать для переупорядочивания столбцов, если вам нужно сделать это для многих таблиц).
вы можете определить новый
udfпри добавленииcolumn_name:u_f = F.udf(lambda :yourstring,StringType()) a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf from pyspark.sql.types import * func_name = udf( lambda val: val, # do sth to val StringType() ) df.withColumn('new_col', func_name(df.old_col))
Comments