Как createCombiner, mergeValue, mergeCombiner работает в CombineByKey в Spark (с использованием Scala)
9633 просмотра
1 ответ
Я пытаюсь понять, как работает каждый шаг combineByKeys
.
Может кто-нибудь, пожалуйста, помогите мне понять то же самое для ниже RDD?
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4),
("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
("C", 122), ("C", 3), ("C", 55)), 2)
rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
Автор: Randomlogicx
Источник
Размещён: 12.11.2019 09:27
Ответы (1)
55 плюса
Во-первых, давайте разберем процесс:
Сначала createCombiner
создается начальное значение (объединитель) для первого обращения к ключу в разделе, если он не найден -->
(firstValueEncountered, 1)
. Таким образом, это просто инициализация кортежа с первым значением и счетчиком ключей, равным 1.
Затем mergeValue
запускается только в том случае, если для найденного ключа в этом разделе уже создан комбинатор (в нашем случае кортеж) -->
(existingTuple._1 + subSequentValue, existingTuple._2 + 1)
. Это добавляет существующее значение кортежа (в первом слоте) к вновь обнаруженному значению и берет счетчик существующего кортежа (во втором слоте) и увеличивает его. По этому мы
Затем mergeCombiner
берет объединители (кортежи), созданные на каждом разделе, и объединяет их вместе -->
(tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)
. Это просто сложение значений из каждого кортежа и счетчиков в один кортеж.
Затем давайте разберем подмножество ваших данных на разделы и увидим их в действии:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
Раздел 1
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
Раздел 2
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
Объединить разделы вместе
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
Итак, вы должны вернуть массив примерно так:
Array((A, (24, 3)), (B, (25, 3)))
Автор: Justin Pihony
Размещён: 25.03.2015 05:08
Вопросы из категории :
- apache-spark Как сделать акулу / искру очистить кеш?
- apache-spark (PySpark) Вложенные списки после reduByKey
- apache-spark Spark java.lang.OutOfMemoryError: пространство кучи Java
- apache-spark Apache Spark: map vs mapPartitions?
- apache-spark Spark: лучшая практика для извлечения больших данных из RDD на локальный компьютер
- apache-spark В чем разница между map и flatMap и хорошим вариантом использования для каждого?
- apache-spark Задача не сериализуема: java.io.NotSerializableException при вызове функции вне замыкания только для классов, а не объектов
- apache-spark Spark не хватает памяти при группировке по ключу
- apache-spark Как распечатать содержимое RDD?
- apache-spark импорт pyspark в оболочку python
- apache-spark Как мне запустить graphx с Python / pyspark?
- apache-spark Получение идентификатора запуска приложения для задания Spark
- apache-spark Модифицировать коллекцию внутри каждого Spark RDD
- apache-spark Запись в несколько выходов по ключу Spark - одно задание Spark
- apache-spark Как прочитать несколько текстовых файлов в одном RDD?
- apache-spark java + spark: org.apache.spark.SparkException: задание прервано: задача не сериализуема: java.io.NotSerializableException
- apache-spark Записывать и читать необработанные байтовые массивы в Spark - используя файл последовательности SequenceFile
- apache-spark Ошибка подключения EsOutputFormat при использовании искры
- apache-spark Как конвертировать CSV-файл в RDD
- apache-spark how to make saveAsTextFile NOT split output into multiple file?