Ввод Spark в Mongo db занимает 10 часов для данных 60 Гб

mongodb apache-spark

1804 просмотра

1 ответ

11 Репутация автора

Я использую Spark: 1.6.2 и MongoDB: 3.2.8

У меня есть датафрейм с 8 столбцами и 1 миллиард строк. случайная запись для фрейма данных составляет 60 ГБ.

Я собираюсь вставить этот фрейм данных в mongodb, используя mongo-spark-conector (mongo-spark-connector_2.10).

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save();

Для вставки требуется более 10 часов.

Как я могу увеличить производительность?

Автор: Swaroop P Источник Размещён: 18.07.2016 08:05

Ответы (1)


6 плюса

15194 Репутация автора

Там не так много, чтобы продолжить:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

Но независимо от mongoDBOptionsнастройки потребуется двоякое и узкие места производительности должны быть исправлены в Spark и MongoDB. Ключом к успеху является понимание того, что происходит, когда вы запускаете приведенный выше код, только тогда вы сможете определить лучший курс на повышение производительности.

искра

У меня есть датафрейм с 8 столбцами и 1 миллиард строк. случайная запись для фрейма данных составляет 60 ГБ.

Информации о ней нет, sourceValueDfно вам нужно будет настроить источник и понять, является ли это узким местом? См. Документацию Spark Monitoring, чтобы узнать больше о том, что происходит в ваших заданиях Spark.

В целом, ключевые моменты для настройки Spark: разбиение на разделы , кэширование , сериализация и работа в случайном порядке . За дополнительной информацией обращайтесь к этому замечательному сообщению в блоге от cloudera: Работа с Apache Spark: или Как я научился перестать беспокоиться и любить перемешивание . Существует множество потенциальных способов улучшить работу Spark.

MongoDB

Давайте посмотрим, что будет делать соединитель MongoDB Spark:

MongoSpark.write(sourceValueDf).options(mongoDbOptions).mode(SaveMode.Append).save()

Здесь соединитель принимает базовый RDD и сохраняет данные в виде документов в существующей базе данных, используя следующую логику:

rdd.foreachPartition(iter => if (iter.nonEmpty) {
    mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[D] =>
    iter.grouped(DefaultMaxBatchSize).foreach(batch => collection.insertMany(batch.toList.asJava))
    })
})

Для каждого раздела он выполняет пакетную запись в insertMany, используя 512 документов на пакет (размер основной партии базового драйвера Java). Небольшое количество разделов sourceValueDfможет негативно повлиять на производительность сохранения. Увеличение количества разделов может повысить параллелизуемость этого метода среди рабочих Spark и, следовательно, увеличить полную производительность.

Существуют и другие общие методы улучшения производительности записи для массовых операций в MongoDB:

  • сеть

    Обеспечение того, что Spark Workers и экземпляры MongoDB расположены в одном месте или имеют как можно меньший сетевой скачок. Вы не можете победить физику.

  • Sharding

    Увеличьте распараллеливаемость записей, вставляя их в сегментированную коллекцию, особенно при вставке данных, предварительно отсортированных по ключу сегмента. Совместное размещение рабочих Spark с Sharded MongoD может обеспечить максимально быстрый сценарий для записи. Посмотрите, как я могу достигнуть раздела локальности данных в документации соединителя для получения дополнительной информации о параметрах конфигурации.

  • Индексы

    Удалите индексы перед вставкой данных и перестройте их после. При вставке больших объемов данных пользователи обнаружили улучшения производительности, удалив индексы в начале процесса, а затем построив их только один раз в конце. Например:

    val writeConfig = WriteConfig(mongoDbOptions)
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, {
      coll: MongoCollection[Document] => coll.dropIndex("index")
    })
    
    MongoSpark.write(sourceValueDf)
              .options(writeConfig.asOptions)
              .mode(SaveMode.Append)
              .save()
    
    MongoConnector(writeConfig.asOptions).withCollectionDo(writeConfig, {  
      coll: MongoCollection[Document] => coll.createIndex(...)
    })
    
  • Написать беспокойство

    Запись только на основной узел и не ожидание репликации повышает скорость за счет избыточности. Это можно настроить через WriteConfig/ mongoDbOptions. См. Документацию конфигурации вывода .

Какова нагрузка на машины (ы) MongoDB, когда вы запускаете это задание? Это узкое место? Такие системы, как MongoDB Cloud Manager, обеспечивают полный мониторинг производительности и помогают вам понять, что происходит на уровне MongoDB.

Улучшение производительности MongoDB и Spark

Короче говоря, нет никакой панацеи или волшебной опции конфигурации, чтобы помочь улучшить производительность. Для этого потребуется отладка, понимание существующей проблемы и потенциально продуманная конфигурация кластеров Spark и MongoDB. Вместе они уже показали, что обеспечивают очень быстрое вычисление и хранение, но это зависит от использования и каждой системы, работающей вместе.

Первые шаги - использовать доступные инструменты мониторинга, чтобы понять, где находятся узкие места.

Автор: Ross Размещён: 18.07.2016 10:51
Вопросы из категории :
32x32