Странное поведение clojure ref

concurrency clojure ref stm

201 просмотра

3 ответа

У меня есть 100 работников (агентов), которые совместно используют один, refкоторый содержит коллекцию задач. Хотя у этой коллекции есть задачи, каждый работник получает одну задачу из этой коллекции (в dosyncблоке), печатает ее и иногда помещает обратно в коллекцию (в dosyncблоке):

(defn have-tasks?
  [tasks]
  (not (empty? @tasks)))

(defn get-task
  [tasks]
  (dosync
    (let [task (first @tasks)]
      (alter tasks rest)
      task)))

(defn put-task
  [tasks task]
  (dosync (alter tasks conj task))
  nil)

(defn worker
  [& {:keys [tasks]}]
  (agent {:tasks tasks}))

(defn worker-loop
  [{:keys [tasks] :as state}]
  (while (have-tasks? tasks)
    (let [task (get-task tasks)]
      (println "Task: " task)
      (when (< (rand) 0.1)
        (put-task tasks task))))
  state)

(defn create-workers
  [count & options]
  (->> (range 0 count)
       (map (fn [_] (apply worker options)))
       (into [])))

(defn start-workers
  [workers]
  (doseq [worker workers] (send-off worker worker-loop)))

(def tasks (ref (range 1 10000000)))

(def workers (create-workers 100 :tasks tasks))

(start-workers workers)
(apply await workers)

Когда я запускаю этот код, последнее значение печатается агентами (после нескольких попыток): 435445, 4556294, 1322061, 3950017. Но никогда не 9999999то, что я ожидаю. И каждый раз, когда коллекция действительно пуста в конце. Что я делаю не так?

Редактировать:

Я переписал рабочий цикл как можно проще:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop []
    (when-let [task (get-task tasks)]
      (println "Task: " task)
      (recur)))
  state)

Но проблема все еще там. Этот код ведет себя, как и ожидалось, при создании одного и только одного работника.

Автор: Sergei Koledov Источник Размещён: 08.11.2019 11:23

Ответы (3)


4 плюса

Решение

Проблема здесь не имеет ничего общего с агентами и едва ли связана с ленью. Вот несколько сокращенная версия исходного кода, которая все еще демонстрирует проблему:

(defn f [init]
  (let [state (ref init)
        task (fn []
               (loop [last-n nil]
                 (if-let [n (dosync
                              (let [n (first @state)]
                                (alter state rest)
                                n))]
                   (recur n)
                   (locking :out
                     (println "Last seen:" last-n)))))
        workers (->> (range 0 5)
                     (mapv (fn [_] (Thread. task))))]
    (doseq [w workers] (.start w))
    (doseq [w workers] (.join w))))

(defn r []
  (f (range 1 100000)))

(defn i [] (f (->> (iterate inc 1)
                   (take 100000))))

(defn t []
  (f (->> (range 1 100000)
          (take Integer/MAX_VALUE))))

Выполнение этого кода показывает, что оба iи tоба - ленивый - надежно работают, а rнадежно - нет. Проблема на самом деле заключается в ошибке параллелизма в классе, возвращаемом rangeвызовом. Действительно, эта ошибка задокументирована в этом билете Clojure и исправлена ​​в версии Clojure 1.9.0-alpha11.

Краткое описание ошибки в случае, если билет по какой-то причине недоступен: во внутреннем restвызове по результату rangeбыла небольшая возможность для условия гонки: « флаг », который говорит «следующее значение уже был вычислен "был установлен до самого фактического значения , что означало, что второй поток мог видеть этот флаг как истинный, даже если" следующее значение "все еще nil. Призыв к alterтогда установит это nilзначение в ссылке. Это было исправлено путем замены двух строк назначения .

В тех случаях, когда результат rangeбыл принудительно реализован в одном потоке или обернут в другой ленивый seq, эта ошибка не появлялась.

Автор: Gary Verhaegen Размещён: 28.08.2017 12:32

3 плюса

Я задал этот вопрос в Google Clojure Group, и он помог мне найти ответ.

Проблема в том, что я использовал ленивую последовательность в транзакции STM.

Когда я заменил этот код:

(def tasks (ref (range 1 10000000)))

этим:

(def tasks (ref (into [] (range 1 10000000))))

все заработало как положено!

В моем рабочем коде, где возникла проблема, я использовал инфраструктуру Korma, которая также возвращает ленивую коллекцию кортежей, как в моем примере.

Вывод: избегайте использования ленивых структур данных в транзакции STM.

Автор: Sergei Koledov Размещён: 25.08.2016 01:52

1 плюс

Когда достигается последний номер в диапазоне, рабочие остаются в руках. Некоторые из них будут возвращены в очередь для последующей обработки.

Чтобы лучше увидеть, что происходит, вы можете worker-loopраспечатать последнюю задачу, обработанную каждым работником:

(defn worker-loop
  [{:keys [tasks] :as state}]
  (loop [last-task nil]
    (if (have-tasks? tasks)
      (let [task (get-task tasks)]
        ;; (when (< (rand) 0.1)
        ;;   (put-task tasks task)
        (recur task))
      (when last-task
        (println "Last task:" last-task))))
  state)

Это также показывает состояние гонки в коде, когда have-tasks?часто просматриваемые задачи выполняются другими, когда get-taskвызывается в конце обработки задач.

Состояние гонки можно решить, удалив have-tasks?и взяв вместо этого возвращаемое значение nil get-taskв качестве сигнала о том, что больше нет задач (на данный момент).

Обновлено:

Как видно, условия этой гонки не объясняют проблему.

Также проблема не решается удалением возможного состояния гонки get-taskследующим образом:

(defn get-task [tasks]
  (dosync
   (first (alter tasks rest))))

Однако переход get-taskк использованию явной блокировки, похоже, решает проблему:

 (defn get-task [tasks]  
   (locking :lock
     (dosync
       (let [task (first @tasks)]
         (alter tasks rest)
         task))))
Автор: Terje D. Размещён: 20.08.2016 10:00
Вопросы из категории :
32x32