Процессы не присоединяются даже после очистки их очередей

python multithreading queue multiprocessing ipc

527 просмотра

1 ответ

Я написал программу с использованием модуля, multiprocessingкоторый в глобальном масштабе выполняется следующим образом:

  1. и a simulationи uiпроцессы запускаются.
  2. simulationпроцесс питает очередь с новыми состояниями моделирования. Если очередь заполнена, цикл моделирования не блокируется, поэтому он может обрабатывать возможные входящие сообщения.
  3. uiпроцесс потребляет очередь моделирования.
  4. примерно через 1 секунду времени выполнения uiпроцесс отправляет quitсобытие главному процессу, а затем выходит из цикла. При выходе он отправляет stoppedсобытие в основной процесс через _create_process()внутреннюю wrapper()функцию.
  5. основной процесс получает оба события в любом порядке. Это quitсобытие приводит к тому, что основной процесс отправляет stopсигналы всем дочерним процессам, в то время как stoppedсобытие увеличивает счетчик в главном цикле, что приведет к его выходу после получения столько stoppedсобытий, сколько существует процессов.
  6. simulationпроцесс получает stopсобытие и выходит из цикла, посылая в свою очередь, stoppedсобытие к основному процессу.
  7. в настоящее время главный процесс получил всего 2 stoppedсобытия и приходит к выводу, что все дочерние процессы на своем пути остановлены. В результате основной цикл завершается
  8. run()функция промывает очереди , которые были написаны в дочерних процессов.
  9. дочерние процессы объединяются.

Проблема в том, что довольно часто (но не всегда) программа зависает при попытке присоединиться к simulationпроцессу, как показано в журнале ниже.

[...]
[INFO/ui] process exiting with exitcode 0
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/simulation] Queue._start_thread()
[DEBUG/simulation] doing self._thread.start()
[DEBUG/simulation] starting thread to feed data to pipe
[DEBUG/simulation] ... done self._thread.start()
[DEBUG/simulation] telling queue thread to quit
[DEBUG/MainProcess] all child processes (2) should have been stopped!
[INFO/simulation] process shutting down
[DEBUG/simulation] running all "atexit" finalizers with priority >= 0
[DEBUG/simulation] telling queue thread to quit
[DEBUG/simulation] running the remaining "atexit" finalizers
[DEBUG/simulation] joining queue thread
[DEBUG/MainProcess] joining process <Process(simulation, started)>
[DEBUG/simulation] feeder thread got sentinel -- exiting
[DEBUG/simulation] ... queue thread joined
[DEBUG/simulation] joining queue thread

Остановка выполнения через a Ctrl + Cв оболочке приводит к искаженным обратным вызовам:

Process simulation:
Traceback (most recent call last):
Traceback (most recent call last):
  File "./debug.py", line 224, in <module>
    run()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 257, in _bootstrap
    util._exit_function()
  File "./debug.py", line 92, in run
    process.join()  #< This doesn't work.
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 312, in _exit_function
    _run_finalizers()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/process.py", line 121, in join
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 252, in _run_finalizers
    finalizer()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185, in __call__
    res = self._callback(*self._args, **self._kwargs)
    res = self._popen.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 54, in wait
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 196, in _finalize_join
    thread.join()
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/popen_fork.py", line 30, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1060, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/threading.py", line 1076, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Что касается кода, вот его урезанная версия (поэтому он часто кажется неполным):

#!/usr/bin/env python3

import logging
import multiprocessing
import pickle
import queue
import time

from collections import namedtuple

_LOGGER = multiprocessing.log_to_stderr()
_LOGGER.setLevel(logging.DEBUG)

_BUFFER_SIZE = 4
_DATA_LENGTH = 2 ** 12

_STATUS_SUCCESS = 0
_STATUS_FAILURE = 1

_EVENT_ERROR = 0
_EVENT_QUIT = 1
_EVENT_STOPPED = 2

_MESSAGE_STOP = 0
_MESSAGE_EVENT = 1
_MESSAGE_SIMULATION_UPDATE = 2

_Message = namedtuple('_Message', ('type', 'value',))
_StopMessage = namedtuple('_StopMessage', ())
_EventMessage = namedtuple('_EventMessage', ('type', 'value',))
_SimulationUpdateMessage = namedtuple('_SimulationUpdateMessage', ('state',))

_MESSAGE_STRUCTS = {
    _MESSAGE_STOP: _StopMessage,
    _MESSAGE_EVENT: _EventMessage,
    _MESSAGE_SIMULATION_UPDATE: _SimulationUpdateMessage
}

def run():
    # Messages from the main process to the child ones.
    downward_queue = multiprocessing.Queue()
    # Messages from the child processes to the main one.
    upward_queue = multiprocessing.Queue()
    # Messages from the simulation process to the UI one.
    simulation_to_ui_queue = multiprocessing.Queue(maxsize=_BUFFER_SIZE)

    # Regroup all the queues that can be written by child processes.
    child_process_queues = (upward_queue, simulation_to_ui_queue,)

    processes = (
        _create_process(
            _simulation,
            upward_queue,
            name='simulation',
            args=(
                simulation_to_ui_queue,
                downward_queue
            )
        ),
        _create_process(
            _ui,
            upward_queue,
            name='ui',
            args=(
                upward_queue,
                simulation_to_ui_queue,
                downward_queue
            )
        )
    )

    try:
        for process in processes:
            process.start()

        _main(downward_queue, upward_queue, len(processes))
    finally:
        # while True:
        #     alive_processes = tuple(process for process in processes
        #                             if process.is_alive())
        #     if not alive_processes:
        #         break

        #     _LOGGER.debug("processes still alive: %s" % (alive_processes,))

        for q in child_process_queues:
            _flush_queue(q)

        for process in processes:
            _LOGGER.debug("joining process %s" % process)
            # process.terminate()  #< This works!
            process.join()  #< This doesn't work.

def _main(downward_queue, upward_queue, process_count):
    try:
        stopped_count = 0
        while True:
            message = _receive_message(upward_queue, False)
            if message is not None and message.type == _MESSAGE_EVENT:
                event_type = message.value.type
                if event_type in (_EVENT_QUIT, _EVENT_ERROR):
                    break
                elif event_type == _EVENT_STOPPED:
                    stopped_count += 1
                    if stopped_count >= process_count:
                        break
    finally:
        # Whatever happens, make sure that all child processes have stopped.
        if stopped_count >= process_count:
            return

        # Send a 'stop' signal to all the child processes.
        for _ in range(process_count):
            _send_message(downward_queue, True, _MESSAGE_STOP)

        while True:
            message = _receive_message(upward_queue, False)
            if (message is not None
                    and message.type == _MESSAGE_EVENT
                    and message.value.type == _EVENT_STOPPED):
                stopped_count += 1
                if stopped_count >= process_count:
                    _LOGGER.debug(
                        "all child processes (%d) should have been stopped!"
                        % stopped_count
                    )
                    break

def _simulation(simulation_to_ui_queue, downward_queue):
    simulation_state = [i * 0.123 for i in range(_DATA_LENGTH)]

    # When the queue is full (possibly form reaching _BUFFER_SIZE), the next
    # solve is computed and kept around until the queue is being consumed.
    next_solve_message = None
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if next_solve_message is None:
            # _step(simulation_state)

            # Somehow the copy (pickle) seems to increase the chances for
            # the issue to happen.
            next_solve_message = _SimulationUpdateMessage(
                state=pickle.dumps(simulation_state)
            )

        status = _send_message(simulation_to_ui_queue, False,
                               _MESSAGE_SIMULATION_UPDATE,
                               **next_solve_message._asdict())
        if status == _STATUS_SUCCESS:
            next_solve_message = None

def _ui(upward_queue, simulation_to_ui_queue, downward_queue):
    time_start = -1.0
    previous_time = 0.0
    while True:
        message = _receive_message(downward_queue, False)
        if message is not None and message.type == _MESSAGE_STOP:
            break

        if time_start < 0:
            current_time = 0.0
            time_start = time.perf_counter()
        else:
            current_time = time.perf_counter() - time_start

        message = _receive_message(simulation_to_ui_queue, False)

        if current_time > 1.0:
            _LOGGER.debug("asking to quit")
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_QUIT, value=None)
            break

        previous_time = current_time

def _create_process(target, upward_queue, name='', args=None):
    def wrapper(function, upward_queue, *args, **kwargs):
        try:
            function(*args, **kwargs)
        except Exception:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_ERROR, value=None)
        finally:
            _send_message(upward_queue, True, _MESSAGE_EVENT,
                          type=_EVENT_STOPPED, value=None)
            upward_queue.close()

    process = multiprocessing.Process(
        target=wrapper,
        name=name,
        args=(target, upward_queue) + args,
        kwargs={}
    )
    return process

def _receive_message(q, block):
    try:
        message = q.get(block=block)
    except queue.Empty:
        return None

    return message

def _send_message(q, block, message_type, **kwargs):
    message_value = _MESSAGE_STRUCTS[message_type](**kwargs)
    try:
        q.put(_Message(type=message_type, value=message_value), block=block)
    except queue.Full:
        return _STATUS_FAILURE

    return _STATUS_SUCCESS

def _flush_queue(q):
    try:
        while True:
            q.get(block=False)
    except queue.Empty:
        pass

if __name__ == '__main__':
    run()

Смежные вопросы о StackOverflow и подсказки в документации Python сводятся к необходимости очищать очереди перед присоединением к процессам, что, я полагаю, я пытался сделать здесь. Я понимаю, что очередь симуляции все еще может пытаться протолкнуть (потенциально большие) буферизованные данные в канал к тому моменту, когда программа попытается сбросить их при выходе и, таким образом, получить еще не пустые очереди. Вот почему я пытался убедиться, что все дочерние процессы были остановлены до достижения этой точки. Теперь, глядя на журнал выше и на дополнительный журнал, выводимый после раскомментирования while Trueцикла проверки на наличие активных процессов, кажется, что simulationпроцесс просто не хочет полностью завершать работу, даже если его целевая функция определенно завершена. Может ли это быть причиной моей проблемы?

Если так, то как мне полагаться, чтобы справиться с этим чисто? Иначе чего мне здесь не хватает?

Протестировано с Python 3.4 на Mac OS X 10.9.5.

PS: мне интересно, не может ли это быть связано с этой ошибкой ?

Автор: ChristopherC Источник Размещён: 08.11.2019 11:23

Ответы (1)


0 плюса

Решение

Похоже, что проблема была в действительности из-за некоторой задержки в передаче данных через очередь, из-за чего сбросы были неэффективными из-за слишком раннего запуска.

Простое, while process.is_alive(): flush_the_queues()кажется, делает свое дело!

Автор: ChristopherC Размещён: 21.08.2016 09:27
Вопросы из категории :
32x32