Apache Spark: map vs mapPartitions?

performance scala apache-spark rdd

91296 просмотра

3 ответа

What's the difference between an RDD's map and mapPartitions method? And does flatMap behave like map or like mapPartitions? Thanks.

(edit) i.e. what is the difference (either semantically or in terms of execution) between

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

And:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }
Автор: Nicholas White Источник Размещён: 14.07.2019 11:00

Ответы (3)


105 плюса

Решение

What's the difference between an RDD's map and mapPartitions method?

The method map converts each element of the source RDD into a single element of the result RDD by applying a function. mapPartitions converts each partition of the source RDD into multiple elements of the result (possibly none).

And does flatMap behave like map or like mapPartitions?

Кроме того, flatMap работает с одним элементом (как map) и создает несколько элементов результата (как mapPartitions).

Автор: Alexey Romanov Размещён: 17.01.2014 07:46

96 плюса

Настоятельный СОВЕТ :

Всякий раз, когда у вас есть тяжелая инициализация, которая должна быть выполнена один раз для многих RDDэлементов, а не один раз для каждого RDDэлемента, и если эта инициализация, такая как создание объектов из сторонней библиотеки, не может быть сериализована (так что Spark может передать ее через кластер в рабочие узлы), используйте mapPartitions()вместо map(). mapPartitions()предусматривает инициализацию, которая выполняется один раз для каждой рабочей задачи / потока / раздела, а не один раз для каждого RDDэлемента данных, например: см. ниже.

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

Q2. ведет flatMapсебя как карта или как mapPartitions?

Да. пожалуйста, смотрите пример 2 из flatmap.. самоочевидный.

Q1. В чем разница между RDD mapиmapPartitions

mapработает функция, используемая на уровне элемента, в то время как mapPartitionsвыполняет функцию на уровне раздела.

Пример сценария : если у нас есть 100K элементов в определенномRDDразделе, то мы будем запускать функцию, используемую преобразованием отображения 100K раз при использованииmap.

И наоборот, если мы используем mapPartitionsто, мы будем вызывать определенную функцию только один раз, но мы передадим все 100К записей и получим все ответы за один вызов функции.

Это приведет к увеличению производительности, так как mapработает с определенной функцией очень много раз, особенно если функция каждый раз делает что-то дорогое, что не потребовалось бы, если бы мы передали все элементы одновременно (в случае mappartitions).

карта

Применяет функцию преобразования к каждому элементу СДР и возвращает результат как новый СДР.

Листинг Варианты

карта отображения [U: ClassTag] (f: T => U): RDD [U]

Пример :

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
 val b = a.map(_.length)
 val c = a.zip(b)
 c.collect
 res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 

mapPartitions

Это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательного потока значений через входной аргумент (Iterarator [T]). Пользовательская функция должна возвращать еще один Iterator [U]. Объединенные итераторы результатов автоматически преобразуются в новый СДР. Обратите внимание, что кортежи (3,4) и (6,7) отсутствуют в следующем результате из-за выбранного нами разбиения.

preservesPartitioningуказывает, сохраняет ли функция ввода разделитель, что должно быть, falseесли только это не пара RDD, а функция ввода не изменяет ключи.

Листинг Варианты

def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], preservedPartitioning: Boolean = false): RDD [U]

Пример 1

val a = sc.parallelize(1 to 9, 3)
 def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
   var res = List[(T, T)]()
   var pre = iter.next
   while (iter.hasNext)
   {
     val cur = iter.next;
     res .::= (pre, cur)
     pre = cur;
   }
   res.iterator
 }
 a.mapPartitions(myfunc).collect
 res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

Пример 2

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
 def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
   var res = List[Int]()
   while (iter.hasNext) {
     val cur = iter.next;
     res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
   }
   res.iterator
 }
 x.mapPartitions(myfunc).collect
 // some of the number are not outputted at all. This is because the random number generated for it is zero.
 res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 

Вышеуказанная программа также может быть написана с использованием flatMap следующим образом.

Пример 2 с использованием flatmap

val x  = sc.parallelize(1 to 10, 3)
 x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect

 res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 

Заключение :

mapPartitionsпреобразование происходит быстрее, чем mapкогда оно вызывает вашу функцию один раз / раздел, а не один раз / элемент.

Дальнейшее чтение: foreach против foreachPartitions Когда использовать Что?

Автор: Ram Ghadiyaram Размещён: 29.08.2016 10:17

15 плюса

Карта :

  1. Он обрабатывает одну строку за раз, очень похоже на метод map () MapReduce.
  2. Вы возвращаетесь из трансформации после каждой строки.

MapPartitions

  1. Обрабатывает полный раздел за один раз.
  2. Вы можете вернуться из функции только один раз после обработки всего раздела.
  3. Все промежуточные результаты должны храниться в памяти, пока вы не обработаете весь раздел.
  4. Предоставляет вам как функцию setup () map () и cleanup () функции MapReduce

Map Vs mapPartitions http://bytepadding.com/big-data/spark/spark-map-vs-mappartitions/

Spark Map http://bytepadding.com/big-data/spark/spark-map/

Spark mapPartitions http://bytepadding.com/big-data/spark/spark-mappartitions/

Автор: KrazyGautam Размещён: 13.03.2017 12:09
Вопросы из категории :
32x32