Вопрос:

Разное поведение при вызове функции и встроенного кода

python apache-beam

41 просмотра

1 ответ

6343 Репутация автора

Я пытаюсь выяснить, возможно ли отправить элементы PCollection в родительский процесс при использовании DirectRunnerиз Python Apache Beam SDK.

Однако я столкнулся со странной ошибкой, когда все, кажется, работает нормально, когда создается очередь, и конвейер вызывается внутри __main__секции скрипта, но не тогда, когда тот же код вызывается внутри подфункции. Я предполагаю, что это происходит из-за какого-то травления / укропа, который происходит за кулисами, но было бы желательно получить более конкретное объяснение.

/tmp/inputs/winterstale.txtФайл , используемый ниже , можно скачать с сайта : https://storage.googleapis.com/apache-beam-samples/shakespeare/winterstale.txt

from __future__ import print_function

import atexit
import queue
import tempfile
import time
import unittest

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.interactive.cache_manager import FileBasedCacheManager
from apache_beam.runners.interactive.cache_manager import ReadCache
from apache_beam.runners.interactive.cache_manager import WriteCache


def add_to_queue(element, queue):
  queue.put(element)


def write_to_queue():
  q = queue.Queue()

  with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
    _ = (
        p
        | "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
        | "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
        | "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
        | "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
    )

  return list(q.queue)


if __name__ == "__main__":
  cache_location = tempfile.mkdtemp()
  atexit.register(FileSystems.delete, [cache_location])

  # Using a function call
  cache_manager = FileBasedCacheManager(cache_dir=cache_location)

  result1 = write_to_queue()
  print(len(result1))  # >>> prints "0" <<<

  # Copy-pasing the code from "write_to_queue()"
  q = queue.Queue()

  with beam.Pipeline(runner=BundleBasedDirectRunner()) as p:
    _ = (
        p
        | "Read" >> beam.io.ReadFromText("/tmp/inputs/winterstale.txt")
        | "Remove whitespace" >> beam.Map(lambda element: element.strip("\n\t|"))
        | "Remove empty lines" >> beam.FlatMap(lambda element: [element] if element else [])
        | "Write" >> beam.Map(lambda element: add_to_queue(element, queue=q))
    )

  result2 = list(q.queue)  # >>> prints "3561" <<<
  print(len(result2))
Автор: ostrokach Источник Размещён: 12.06.2019 12:53

Ответы (1)


0 плюса

554 Репутация автора

В общем, все мариновано перед отправкой бегуну. В этом случае сам объект очереди обычно будет замаринован, а ваши элементы будут добавлены к необрабатываемой копии во время выполнения (отсюда возвращаемое значение 0). Я думаю, что здесь происходит то, что BundleBasedDirectRunner непостоянен в отношении того, что он обрабатывает (например, в зависимости от того, были ли ранее ошибки травления, из-за включения закрытия из основного сеанса он может отказаться от всех попыток засолки и продолжить с исходными объектами).

Возможно, стоит попробовать с каким-нибудь другим бегуном, в этом случае поведение должно быть согласованным (вероятно, всегда равным нулю), и если есть ошибка травления, она будет информативно повышена, а не подавлена.

Автор: robertwb Размещён: 12.06.2019 02:55
Вопросы из категории :
32x32