Вопрос:

Spark SQL-запросы к многораздельным данным с использованием диапазонов дат

apache-spark apache-spark-sql

5016 просмотра

2 ответа

338 Репутация автора

Мой набор данных разделен таким образом:

Year=yyyy
 |---Month=mm
 |   |---Day=dd
 |   |   |---<parquet-files>

Какой самый простой и эффективный способ создания фрейма данных в спарк, загруженный данными между двумя датами?

Автор: r4ravi2008 Источник Размещён: 08.11.2017 10:52

Ответы (2)


3 плюса

135 Репутация автора

Отредактировано, чтобы добавить несколько путей загрузки для адреса комментария.

Вы можете использовать синтаксис в стиле регулярных выражений.

val dataset = spark
  .read
  .format("parquet")
  .option("filterPushdown", "true")
  .option("basePath", "hdfs:///basepath/")
  .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/",
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")

Как использовать регулярные выражения для включения / исключения некоторых входных файлов в sc.textFile?

Примечание : вам не нужно то, что X=*вы можете сделать, *если хотите все дни, месяцы и т. Д.

Вы, вероятно, также должны немного почитать о Predicate Pushdown (т. Е. FilterPushdown имеет значение true выше).

Наконец, вы заметите вышеуказанный параметр basepath, причину этого можно найти здесь: Предотвратить DataFrame.partitionBy () от удаления разделенных столбцов из схемы

Автор: Robert Beatty Размещён: 09.11.2017 03:56

8 плюса

7748 Репутация автора

Решение

Если вам абсолютно необходимо придерживаться этой стратегии разделения, ответ зависит от того, готовы ли вы нести расходы на обнаружение раздела или нет.

Если вы хотите, чтобы Spark обнаружил все разделы, что должно происходить только один раз (до тех пор, пока вы не добавите новые файлы), вы можете загрузить базовый путь, а затем выполнить фильтрацию, используя столбцы разделов.

Если вы не хотите, чтобы Spark обнаруживал все разделы, например, поскольку у вас есть миллионы файлов, единственное эффективное общее решение состоит в том, чтобы разбить интервал, по которому вы хотите выполнить запрос, на несколько подинтервалов, которые вы можете легко запросить, используя подход @ r0bb23 а затем объединение, то вместе.

Если вы хотите получить лучшее из обоих описанных выше случаев и имеете стабильную схему, вы можете зарегистрировать разделы в метастазах, определив внешнюю многораздельную таблицу. Не делайте этого, если вы ожидаете, что ваша схема будет развиваться, поскольку таблицы, управляемые метастазами, в настоящее время довольно плохо управляют развитием схемы.

Например, для запроса между вами 2017-10-06и 2017-11-03вы должны сделать:

// With full discovery
spark.read.parquet("hdfs:///basepath")
  .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
  ))

// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
  .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)

Написание общего кода для этого, безусловно, возможно, но я не сталкивался с этим. Лучшим подходом является разделение в порядке, изложенном в комментарии, который я сделал к вопросу. Если ваша таблица была разделена, используя что-то вроде /basepath/ts=yyyymmddhhmm/*.parquetэтого, то ответ будет простым:

spark.read.parquet("hdfs:///basepath")
  .where('ts >= 201710060000L && 'ts <= 201711030000L)

Причина, по которой стоит добавлять часы и минуты, заключается в том, что вы можете написать общий код, который обрабатывает интервалы независимо от того, разбиты ли данные на неделю, день, час или каждые 15 минут. Фактически вы можете даже управлять данными с разной гранулярностью в одной и той же таблице, например, более старые данные агрегируются на более высоких уровнях, чтобы уменьшить общее количество разделов, которые необходимо обнаружить.

Автор: Sim Размещён: 11.11.2017 10:00
Вопросы из категории :
32x32