Модифицировать коллекцию внутри каждого Spark RDD

scala apache-spark rdd

14365 просмотра

1 ответ

Я пытаюсь добавить элементы на карту во время перебора элементов СДР. Я не получаю никаких ошибок, но изменений не происходит.

Все работает нормально, добавляя напрямую или перебирая другие коллекции:

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Но когда я пытаюсь сделать то же самое с RDD:

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Я попытался распечатать содержимое карты, как это было до foreach, чтобы убедиться, что переменная такая же, и она печатает правильно:

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

Я также напечатал измененный элемент карты внутри кода foreach, и он печатает как измененный, но когда операция завершена, карта кажется неизмененной.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

Преобразование RDD в массив (collect) также работает нормально:

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

Это проблема контекста? Получаю ли я доступ к копии данных, которые изменяются где-то еще?

Автор: palako Источник Размещён: 22.10.2019 05:11

Ответы (1)


33 плюса

Решение

Это становится понятнее при работе на кластере Spark (а не на одной машине). СДР теперь распространяется на несколько машин. Когда вы звоните foreach, вы говорите каждой машине, что делать с частью RDD, которая у нее есть. Если вы ссылаетесь на любые локальные переменные (например myMap), они сериализуются и отправляются на машины, чтобы они могли их использовать. Но ничего не возвращается. Таким образом, ваша оригинальная копия myMapостается неизменной.

Я думаю, что это отвечает на ваш вопрос, но, очевидно, вы пытаетесь достичь чего-то, и вы не сможете попасть туда таким образом. Не стесняйтесь объяснить здесь или в отдельном вопросе, что вы пытаетесь сделать, и я постараюсь помочь.

Автор: Daniel Darabos Размещён: 30.04.2014 07:29
Вопросы из категории :
32x32