Apache Spark: map vs mapPartitions?
91296 просмотра
3 ответа
What's the difference between an RDD's map
and mapPartitions
method? And does flatMap
behave like map
or like mapPartitions
? Thanks.
(edit) i.e. what is the difference (either semantically or in terms of execution) between
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
And:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
Автор: Nicholas White
Источник
Размещён: 14.07.2019 11:00
Ответы (3)
105 плюса
What's the difference between an RDD's map and mapPartitions method?
The method map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into multiple elements of the result (possibly none).
And does flatMap behave like map or like mapPartitions?
Кроме того, flatMap работает с одним элементом (как map
) и создает несколько элементов результата (как mapPartitions
).
96 плюса
Настоятельный СОВЕТ :
Всякий раз, когда у вас есть тяжелая инициализация, которая должна быть выполнена один раз для многих
RDD
элементов, а не один раз для каждогоRDD
элемента, и если эта инициализация, такая как создание объектов из сторонней библиотеки, не может быть сериализована (так что Spark может передать ее через кластер в рабочие узлы), используйтеmapPartitions()
вместоmap()
.mapPartitions()
предусматривает инициализацию, которая выполняется один раз для каждой рабочей задачи / потока / раздела, а не один раз для каждогоRDD
элемента данных, например: см. ниже.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2. ведет
flatMap
себя как карта или какmapPartitions
?
Да. пожалуйста, смотрите пример 2 из flatmap
.. самоочевидный.
Q1. В чем разница между RDD
map
иmapPartitions
map
работает функция, используемая на уровне элемента, в то время какmapPartitions
выполняет функцию на уровне раздела.
Пример сценария : если у нас есть 100K элементов в определенномRDD
разделе, то мы будем запускать функцию, используемую преобразованием отображения 100K раз при использованииmap
.
И наоборот, если мы используем mapPartitions
то, мы будем вызывать определенную функцию только один раз, но мы передадим все 100К записей и получим все ответы за один вызов функции.
Это приведет к увеличению производительности, так как map
работает с определенной функцией очень много раз, особенно если функция каждый раз делает что-то дорогое, что не потребовалось бы, если бы мы передали все элементы одновременно (в случае mappartitions
).
карта
Применяет функцию преобразования к каждому элементу СДР и возвращает результат как новый СДР.
Листинг Варианты
карта отображения [U: ClassTag] (f: T => U): RDD [U]
Пример :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательного потока значений через входной аргумент (Iterarator [T]). Пользовательская функция должна возвращать еще один Iterator [U]. Объединенные итераторы результатов автоматически преобразуются в новый СДР. Обратите внимание, что кортежи (3,4) и (6,7) отсутствуют в следующем результате из-за выбранного нами разбиения.
preservesPartitioning
указывает, сохраняет ли функция ввода разделитель, что должно быть,false
если только это не пара RDD, а функция ввода не изменяет ключи.Листинг Варианты
def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], preservedPartitioning: Boolean = false): RDD [U]
Пример 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Пример 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Вышеуказанная программа также может быть написана с использованием flatMap следующим образом.
Пример 2 с использованием flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Заключение :
mapPartitions
преобразование происходит быстрее, чем map
когда оно вызывает вашу функцию один раз / раздел, а не один раз / элемент.
Дальнейшее чтение: foreach против foreachPartitions Когда использовать Что?
Автор: Ram Ghadiyaram Размещён: 29.08.2016 10:1715 плюса
Карта :
- Он обрабатывает одну строку за раз, очень похоже на метод map () MapReduce.
- Вы возвращаетесь из трансформации после каждой строки.
MapPartitions
Автор: KrazyGautam Размещён: 13.03.2017 12:09
- Обрабатывает полный раздел за один раз.
- Вы можете вернуться из функции только один раз после обработки всего раздела.
- Все промежуточные результаты должны храниться в памяти, пока вы не обработаете весь раздел.
- Предоставляет вам как функцию setup () map () и cleanup () функции MapReduce
Map Vs mapPartitions
http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/
Spark Map
http://bytepadding.com/big-data/spark/spark-map/
Spark mapPartitions
http://bytepadding.com/big-data/spark/spark-mappartitions/
Вопросы из категории :
- performance Как создать новый экземпляр объекта из Типа
- performance Как работает индексация базы данных?
- performance Действительно ли опечатанные классы действительно предлагают преимущества?
- performance Big O, как вы рассчитываете / приближаете это?
- performance MyISAM против InnoDB
- scala Есть ли реальный опыт использования программной транзакционной памяти?
- scala Преобразование коллекции Java в коллекцию Scala
- scala Что означает один апостроф в Scala?
- scala Самый масштабируемый веб-стек для высокопроизводительного приложения Flash / Flex / AIR?
- scala Разбиение макета Scala на инфиксном операторе
- apache-spark Как сделать акулу / искру очистить кеш?
- apache-spark (PySpark) Вложенные списки после reduByKey
- apache-spark Spark java.lang.OutOfMemoryError: пространство кучи Java
- apache-spark Apache Spark: map vs mapPartitions?
- apache-spark Spark: лучшая практика для извлечения больших данных из RDD на локальный компьютер
- rdd Модифицировать коллекцию внутри каждого Spark RDD
- rdd Преобразование простой строки строки в RDD в Spark
- rdd Какая разница между кешем и персистом?
- rdd Как я могу получить SQL row_number, эквивалентный для Spark RDD?