Spark: лучшая практика для извлечения больших данных из RDD на локальный компьютер

apache-spark

40046 просмотра

6 ответа

У меня есть большой RDD (1 ГБ) в кластере пряжи. На локальной машине, где используется этот кластер, у меня всего 512 мб. Я хотел бы перебрать значения в RDD на моей локальной машине. Я не могу использовать collect (), потому что это создаст слишком большой массив локально, что больше, чем моя куча. Мне нужен какой-то итерационный способ. Есть метод iterator (), но он требует дополнительной информации, которую я не могу предоставить.

UDP: передал метод toLocalIterator

Автор: epahomov Источник Размещён: 12.09.2019 02:39

Ответы (6)


43 плюса

Решение

Обновление: RDD.toLocalIterator метод, который появился после написания исходного ответа, является более эффективным способом выполнения работы. Он использует runJobдля оценки только один раздел на каждом шаге.

TL; DR И оригинальный ответ может дать приблизительное представление о том, как это работает:

Прежде всего, получите массив индексов разделов:

val parts = rdd.partitions

Затем создайте меньшие rdds, отфильтровывая все, кроме одного раздела. Соберите данные из меньшего числа и переберите значения одного раздела:

for (p <- parts) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}

Я не пробовал этот код, но он должен работать. Пожалуйста, напишите комментарий, если он не скомпилируется. Конечно, это будет работать, только если разделы достаточно малы. Если это не так, вы всегда можете увеличить количество разделов rdd.coalesce(numParts, true).

Автор: Wildfire Размещён: 15.02.2014 06:33

14 плюса

Ответ Wildfire кажется семантически правильным, но я уверен, что вы сможете значительно повысить эффективность, используя API Spark. Если вы хотите обрабатывать каждый раздел по очереди, я не понимаю, почему вы не можете использовать map/ filter/ reduce/ reduceByKey/ mapPartitionsоперации. Единственный раз, когда вы хотите, чтобы все было в одном месте в одном массиве, - это когда вы собираетесь выполнять немоноидальную операцию - но это не то, что вам нужно. Вы должны быть в состоянии сделать что-то вроде:

rdd.mapPartitions(recordsIterator => your code that processes a single chunk)

Или это

rdd.foreachPartition(partition => {
  partition.toArray
  // Your code
})
Автор: samthebest Размещён: 30.03.2014 11:05

9 плюса

Вот тот же подход, который был предложен @Wildlife, но написан в pyspark.

Хорошая вещь в этом подходе - он позволяет пользователю получать доступ к записям в RDD по порядку. Я использую этот код для подачи данных из RDD в STDIN процесса инструмента машинного обучения.

rdd = sc.parallelize(range(100), 10)
def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    data_from_part_rdd = part_rdd.collect()
    print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)

Производит продукцию:

partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
Автор: vvladymyrov Размещён: 05.06.2015 08:07

2 плюса

Решение pispark для обработки данных с использованием RDD.toLocalIterator () :

separator  = '|'
df_results = hiveCtx.sql(sql)
columns    = df_results.columns
print separator.join(columns)

# Use toLocalIterator() rather than collect(), as this avoids pulling all of the
# data to the driver at one time.  Rather, "the iterator will consume as much memory
# as the largest partition in this RDD."
MAX_BUFFERED_ROW_COUNT = 10000
row_count              = 0
output                 = cStringIO.StringIO()
for record in df_results.rdd.toLocalIterator():
    d = record.asDict()
    output.write(separator.join([str(d[c]) for c in columns]) + '\n')
    row_count += 1
    if row_count % MAX_BUFFERED_ROW_COUNT== 0:
        print output.getvalue().rstrip()
        # it is faster to create a new StringIO rather than clear the existing one
        # http://stackoverflow.com/questions/4330812/how-do-i-clear-a-stringio-object
        output = cStringIO.StringIO()
if row_count % MAX_BUFFERED_ROW_COUNT:
    print output.getvalue().rstrip()
Автор: Mark Rajcok Размещён: 28.09.2016 06:07

1 плюс

Карта / фильтр / уменьшить с помощью Spark и загрузить результаты позже? Я думаю, что обычный подход Hadoop будет работать.

Api говорит, что есть команды map - filter - saveAsFile: https://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

Автор: ya_pulser Размещён: 11.02.2014 10:09

1 плюс

Для Spark 1.3.1 формат выглядит следующим образом

val parts = rdd.partitions
    for (p <- parts) {
        val idx = p.index
        val partRdd = data.mapPartitionsWithIndex { 
           case(index:Int,value:Iterator[(String,String,Float)]) => 
             if (index == idx) value else Iterator()}
        val dataPartitioned = partRdd.collect 
        //Apply further processing on data                      
    }
Автор: agankur21 Размещён: 05.06.2015 07:54
Вопросы из категории :
32x32