Первичные ключи с Apache Spark



У меня есть соединение JDBC с Apache Spark и PostgreSQL, и я хочу вставить некоторые данные в свою базу данных. Когда я использую режим append, мне нужно указать id для каждого DataFrame.Row. Существует ли какой-либо способ для Spark создавать первичные ключи?

728   3  

3 ответов:

Scala :

Если все, что вам нужно, - это уникальные числа, вы можете использовать zipWithUniqueId и воссоздать фрейм данных. Сначала некоторые импортные и фиктивные данные:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Извлечение схемы для дальнейшего использования:

val schema = df.schema

Добавить поле id:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Создать Фрейм Данных:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

То же самое в Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Если вы предпочитаете последовательное число, вы можете заменить zipWithUniqueId на zipWithIndex, но это немного дороже.

Непосредственно с DataFrame API :

(универсальные Scala, Python, Java, R с практически одинаковым синтаксисом)

Ранее я пропустил функцию monotonicallyIncreasingId, которая должна работать просто отлично, пока вам не нужны последовательные числа:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

В то время как полезный monotonicallyIncreasingId недетерминирован. Не только идентификаторы могут отличаться от выполнения к выполнению, но и без дополнительных уловок не могут использоваться для идентификации строк, когда последующие операции содержат фильтры.

Примечание :

Также можно использовать функцию окна rowNumber:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

К сожалению:

Предупреждать окно: нет раздела, определенного для работы окна! Перемещение всех данных в один раздел может привести к серьезному снижению производительности.

Таким образом, если у вас нет естественного способа разделения данных и обеспечения уникальности, это не особенно полезно в данный момент.
from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("id", monotonically_increasing_id()).show()
Заметим, что 2-й аргумент df.withColumn-это monotonically_increasing_id (), а не monotonically_increasing_id .

Я нашел следующее решение относительно простым для случая, когда zipWithIndex() является желаемым поведением, т. е. для тех, кто желает последовательных целых чисел.

В этом случае мы используем pyspark и полагаемся на понимание словаря, чтобы сопоставить исходный объект строки с новым словарем, который соответствует новой схеме, включая уникальный индекс.

# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer

# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
                       + dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
                      .map(lambda (row, id): {k:v
                                              for k, v
                                              in row.asDict().items() + [("uuid", id)]})\
                      .toDF(newSchema)

Comments

    Ничего не найдено.