Как я могу добавить новый столбец к таблице данных СПАРК (используя PySpark)?



У меня есть Spark DataFrame (используя PySpark 1.5.1) и хотел бы добавить новый столбец.



Я пробовал следующее без всякого успеха:



type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])


также получил ошибку, используя это:



my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))


Итак, как добавить новый столбец (на основе вектора Python) в существующий фрейм данных с помощью PySpark?

1297   6  

6 ответов:

вы не можете добавить произвольную колонку в DataFrame в Искра. Новые столбцы могут быть созданы только с помощью литералов (другие типы литералов описаны в как добавить постоянный столбец в фрейм данных Spark?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

преобразование существующего столбца:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

С помощью join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

или генерируется с помощью функции / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

производительность, встроенные функции (pyspark.sql.functions), к которому относится карта Выражение Catalyst, как правило, предпочтительнее, чем пользовательские функции Python.

если вы хотите добавить содержимое произвольного RDD в качестве столбца, вы можете

чтобы добавить столбец с помощью UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

на СПАРК 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

Я хотел бы предложить обобщенный пример очень похожий пример:

случай использования: у меня есть csv, состоящий из:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

мне нужно выполнить некоторые преобразования и окончательный csv должен выглядеть

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

мне нужно сделать это, потому что это схема, определенная некоторой моделью, и мне нужно, чтобы мои окончательные данные были совместимы с массовыми вставками SQL и такими вещами.

так:

1) я читаю оригинальный csv с помощью искра.прочитайте и назовите его "df".

2) я делаю что-то с данными.

3) я добавляю пустые столбцы с помощью этого скрипта:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

таким образом, вы можете структурировать свою схему после загрузки csv (также будет работать для переупорядочивания столбцов, если вам нужно сделать это для многих таблиц).

вы можете определить новый udf при добавлении column_name:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))

Comments

    Ничего не найдено.