Преобразования эпохи на сегодняшний день в Elasticsearch СПАРК
У меня есть фрейм данных, который я записываю в ES
Прежде чем писать в ES, я преобразую столбец EVTExit в дату, которая находится в эпохе.
workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000))))
workset.select("EVTExit").show(10)
+----------+
| EVTExit|
+----------+
|2014-06-03|
|null |
|2012-10-23|
|2014-06-03|
|2015-11-05|
Как я вижу, это EVTExit преобразуется в дату.
workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1")
Но после написания его в ES, я все еще получаю его в формате EPOC.
"EVTExit" : 1401778800000
Может ли кто-нибудь иметь представление о том, что здесь происходит не так?
Спасибо
1 ответ:
Давайте рассмотрим пример
DataFrameиз вашего вопроса:scala> val df = workset.select("EVTExit") // df: org.apache.spark.sql.DataFrame = [EVTExit: date] scala> df.printSchema // root // |-- EVTExit: date (nullable = true)Вам нужно будет привести столбец в строку и отключить
es.mapping.date.rich, который являетсяtrueпо умолчанию.Параметр определяет, следует ли создавать объект типа rich Date для полей даты в Elasticsearch или возвращать их в виде примитивов (String или long). Фактический тип объекта основан на используемой библиотеке; noteable исключение Map / Reduce, которая не предоставляет встроенный объект Date и как таковой LongWritable и текст возвращается независимо от этой настройки.
Я согласен, это противоречит интуиции, но это единственное решение на данный момент, если вы хотите, чтобыelasticsearchне преобразовывал его в форматlong. На самом деле это довольно болезненно.scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string")) // df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string] scala> df2.show // +----------+----------+ // | EVTExit| EVTExit_1| // +----------+----------+ // |2014-06-03|2014-06-03| // | null| null| // |2012-10-23|2012-10-23| // |2014-06-03|2014-06-03| // |2015-11-05|2015-11-05| // +----------+----------+Теперь вы можете записать свои данные в
elasticsearch:scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")Теперь давайте проверим, что на ES. Сначала давайте посмотрим на отображение:
$ curl -XGET localhost:9200/workset?pretty=true { "workset" : { "aliases" : { }, "mappings" : { "workset1" : { "properties" : { "EVTExit" : { "type" : "long" }, "EVTExit_1" : { "type" : "date", "format" : "strict_date_optional_time||epoch_millis" } } } }, "settings" : { "index" : { "creation_date" : "1475063310916", "number_of_shards" : "5", "number_of_replicas" : "1", "uuid" : "i3Rb014sSziCmYm9LyIc5A", "version" : { "created" : "2040099" } } }, "warmers" : { } } }Похоже, у нас есть наши свидания. Теперь давайте проверим содержание :
$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }' { "took" : 2, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 5, "max_score" : 1.0, "hits" : [ { "_index" : "workset", "_type" : "workset1", "_id" : "AVdwn-vFWzMbysX5OjMA", "_score" : 1.0, "_source" : { "EVTExit" : 1401746400000, "EVTExit_1" : "2014-06-03" } } ] } }Примечание 1: Я сохранил оба поля для демонстрации, но я думаю, что вы поняли суть.
Примечание 2: испытано с Elasticsearch 2.4, Spark 1.6.2, scala 2.10 и elasticsearch-spark 2.3.2 внутри
spark-shell$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2Примечание 3: то же решение в
pyspark:from pyspark.sql.functions import col df2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string")) df2.write.format("org.elasticsearch.spark.sql") \ .option("es.mapping.date.rich", "false").save("workset/workset1")
Comments