3 ответов:
Scala :
Если все, что вам нужно, - это уникальные числа, вы можете использоватьzipWithUniqueIdи воссоздать фрейм данных. Сначала некоторые импортные и фиктивные данные:import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType} val df = sc.parallelize(Seq( ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")Извлечение схемы для дальнейшего использования:
val schema = df.schemaДобавить поле id:
val rows = df.rdd.zipWithUniqueId.map{ case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}Создать Фрейм Данных:
val dfWithPK = sqlContext.createDataFrame( rows, StructType(StructField("id", LongType, false) +: schema.fields))То же самое в Python :
from pyspark.sql import Row from pyspark.sql.types import StructField, StructType, LongType row = Row("foo", "bar") row_with_index = Row(*["id"] + df.columns) df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() def make_row(columns): def _make_row(row, uid): row_dict = row.asDict() return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) return _make_row f = make_row(df.columns) df_with_pk = (df.rdd .zipWithUniqueId() .map(lambda x: f(*x)) .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))Если вы предпочитаете последовательное число, вы можете заменить
zipWithUniqueIdнаzipWithIndex, но это немного дороже.Непосредственно с
DataFrameAPI :(универсальные Scala, Python, Java, R с практически одинаковым синтаксисом)
Ранее я пропустил функцию
monotonicallyIncreasingId, которая должна работать просто отлично, пока вам не нужны последовательные числа:import org.apache.spark.sql.functions.monotonicallyIncreasingId df.withColumn("id", monotonicallyIncreasingId).show() // +---+----+-----------+ // |foo| bar| id| // +---+----+-----------+ // | a|-1.0|17179869184| // | b|-2.0|42949672960| // | c|-3.0|60129542144| // +---+----+-----------+В то время как полезный
monotonicallyIncreasingIdнедетерминирован. Не только идентификаторы могут отличаться от выполнения к выполнению, но и без дополнительных уловок не могут использоваться для идентификации строк, когда последующие операции содержат фильтры.Примечание :
Также можно использовать функцию окна
rowNumber:from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber w = Window().orderBy() df.withColumn("id", rowNumber().over(w)).show()К сожалению:
Таким образом, если у вас нет естественного способа разделения данных и обеспечения уникальности, это не особенно полезно в данный момент.Предупреждать окно: нет раздела, определенного для работы окна! Перемещение всех данных в один раздел может привести к серьезному снижению производительности.
Заметим, что 2-й аргумент df.withColumn-это monotonically_increasing_id (), а не monotonically_increasing_id .from pyspark.sql.functions import monotonically_increasing_id df.withColumn("id", monotonically_increasing_id()).show()
Я нашел следующее решение относительно простым для случая, когда zipWithIndex() является желаемым поведением, т. е. для тех, кто желает последовательных целых чисел.
В этом случае мы используем pyspark и полагаемся на понимание словаря, чтобы сопоставить исходный объект строки с новым словарем, который соответствует новой схеме, включая уникальный индекс.
# read the initial dataframe without index dfNoIndex = sqlContext.read.parquet(dataframePath) # Need to zip together with a unique integer # First create a new schema with uuid field appended newSchema = StructType([StructField("uuid", IntegerType(), False)] + dfNoIndex.schema.fields) # zip with the index, map it to a dictionary which includes new field df = dfNoIndex.rdd.zipWithIndex()\ .map(lambda (row, id): {k:v for k, v in row.asDict().items() + [("uuid", id)]})\ .toDF(newSchema)
Comments