Преобразования эпохи на сегодняшний день в Elasticsearch СПАРК



У меня есть фрейм данных, который я записываю в ES



Прежде чем писать в ES, я преобразую столбец EVTExit в дату, которая находится в эпохе.



workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000))))

workset.select("EVTExit").show(10)

+----------+
| EVTExit|
+----------+
|2014-06-03|
|null |
|2012-10-23|
|2014-06-03|
|2015-11-05|


Как я вижу, это EVTExit преобразуется в дату.



workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1")


Но после написания его в ES, я все еще получаю его в формате EPOC.

"EVTExit" : 1401778800000


Может ли кто-нибудь иметь представление о том, что здесь происходит не так?

Спасибо

610   1  

1 ответ:

Давайте рассмотрим пример DataFrame из вашего вопроса:

scala> val df = workset.select("EVTExit")
// df: org.apache.spark.sql.DataFrame = [EVTExit: date]

scala> df.printSchema
// root
//  |-- EVTExit: date (nullable = true)

Вам нужно будет привести столбец в строку и отключить es.mapping.date.rich, который является true по умолчанию.

Параметр определяет, следует ли создавать объект типа rich Date для полей даты в Elasticsearch или возвращать их в виде примитивов (String или long). Фактический тип объекта основан на используемой библиотеке; noteable исключение Map / Reduce, которая не предоставляет встроенный объект Date и как таковой LongWritable и текст возвращается независимо от этой настройки.

Я согласен, это противоречит интуиции, но это единственное решение на данный момент, если вы хотите, чтобы elasticsearch не преобразовывал его в формат long. На самом деле это довольно болезненно.
scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string"))
// df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string]

scala> df2.show
// +----------+----------+
// |   EVTExit| EVTExit_1|
// +----------+----------+
// |2014-06-03|2014-06-03|
// |      null|      null|
// |2012-10-23|2012-10-23|
// |2014-06-03|2014-06-03|
// |2015-11-05|2015-11-05|
// +----------+----------+

Теперь вы можете записать свои данные в elasticsearch:

scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")

Теперь давайте проверим, что на ES. Сначала давайте посмотрим на отображение:

$ curl -XGET localhost:9200/workset?pretty=true
{
  "workset" : {
    "aliases" : { },
    "mappings" : {
      "workset1" : {
        "properties" : {
          "EVTExit" : {
            "type" : "long"
          },
          "EVTExit_1" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1475063310916",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "i3Rb014sSziCmYm9LyIc5A",
        "version" : {
          "created" : "2040099"
        }
      }
    },
    "warmers" : { }
  }
}

Похоже, у нас есть наши свидания. Теперь давайте проверим содержание :

$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }'
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "workset",
      "_type" : "workset1",
      "_id" : "AVdwn-vFWzMbysX5OjMA",
      "_score" : 1.0,
      "_source" : {
        "EVTExit" : 1401746400000,
        "EVTExit_1" : "2014-06-03"
      }
    } ]
  }
}

Примечание 1: Я сохранил оба поля для демонстрации, но я думаю, что вы поняли суть.

Примечание 2: испытано с Elasticsearch 2.4, Spark 1.6.2, scala 2.10 и elasticsearch-spark 2.3.2 внутри spark-shell

$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2

Примечание 3: то же решение в pyspark:

from pyspark.sql.functions import col
df2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string"))
df2.write.format("org.elasticsearch.spark.sql") \
   .option("es.mapping.date.rich", "false").save("workset/workset1")

Comments

    Ничего не найдено.