Широкий алгоритм в потоках Java 8

java algorithm java-stream

128 просмотра

1 ответ

4904 Репутация автора

У меня есть программа, которая по своей сути обрабатывает отсортированные списки (один средний databaseи один большой список queries) интервалов, оба списка сортируются. Каждый интервал базы данных должен соответствовать всем перекрывающимся запросам, а затем список запросов должен быть записан в том же порядке, в котором он был прочитан.

Это похоже на семейство алгоритмов стреловидной линии (поправьте меня, пожалуйста, здесь лучше, если возможно).

Чтобы программа работала с действительно большими входами, я бы хотел (1) работать как можно более локально и (2) записывать данные как можно скорее (то есть, если запрос больше не требуется, его следует записать вне).

Целая задача немного неуклюжа для реализации, но MWE выглядит примерно так, как показано внизу. На самом деле, база данных не такая большая и может быть загружена в память в дерево интервалов или около того. Однако проблема обработки запросов остается.

Теперь у меня вопрос: существует ли элегантное решение, использующее потоки Java 8, чтобы я мог извлечь выгоду из параллелизма (обработка БД с несколькими запросами несколько дорогая)?

Я понимаю, что одной из проблем является группирование каждой записи запроса с более чем одной записью базы данных. Другой проблемой является локальное объединение результатов, как только запрос будет полностью обработан, и никто не сможет помешать следующему выписать запрос.

Спасибо!

package mwe;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

class MWE {

    // Half-open interval [begin, end)
    public static class Interval {
        String name;
        int begin;
        int end;

        Interval(String name, int begin, int end) {
            this.name = name;
            this.begin = begin;
            this.end = end;
        }

        boolean overlaps(Interval that) {
            return (that.begin < this.end) && (this.begin < that.end);
        }

        @Override
        public String toString() {
            return "Interval [name=" + name + ", begin=" + begin + ", end=" + end + "]";
        }

        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + begin;
            result = prime * result + end;
            result = prime * result + ((name == null) ? 0 : name.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            Interval other = (Interval) obj;
            if (begin != other.begin)
                return false;
            if (end != other.end)
                return false;
            if (name == null) {
                if (other.name != null)
                    return false;
            } else if (!name.equals(other.name))
                return false;
            return true;
        }

    }

    // One counter for an interval
    static class IntervalCounter {
        int counter;
        Interval itv;

        IntervalCounter(Interval itv) {
            this.counter = 0;
            this.itv = itv;
        }

        @Override
        public String toString() {
            return "IntervalCounter [counter=" + counter + ", itv=" + itv + "]";
        }

    }

    // DB intervals to come, sorted by begin position
    static List<Interval> inactiveIntervals = new ArrayList<>();
    // Currently active DB intervals, sorted by begin position
    static List<Interval> activeIntervals = new ArrayList<>();
    // Mapping from database to query interval
    static HashMap<Interval, ArrayList<Interval>> dbToQueries = new HashMap<>();
    // Mapping from interval to point into list of outgoing intervals
    static HashMap<Interval, IntervalCounter> itvToCounter = new HashMap<>();
    // List of outgoing qry intervals
    static ArrayList<IntervalCounter> outgoingIntervals = new ArrayList<>();

    static void process(List<Interval> db, List<Interval> qry) {
        inactiveIntervals.addAll(db); // put all into queue

        // Process each query interval
        for (Interval q : qry) {
            assignToIntervals(q);
            processDone(q);
        }

        assignToIntervals(null);
        processDone(null);
    }

    /**
     * Given the current Interval q, process all database intervals for which no more overlap can come
     */
    private static void processDone(Interval q) {
        // Count number of database intervals that are done when q has been processed completely
        int popCount = 0; // number of intervals to pop from front
        for (Interval db : activeIntervals) {
            if (q == null || q.begin >= db.end) {
                System.err.println("Processing in DB " + db.name);
                for (Interval itv : dbToQueries.get(db))
                    System.err.println("  " + itv.name);
                popCount += 1;
            } else {
                break; // cannot guarantee done for next
            }
        }

        // Remove them from the DB list and reduce counters of contained queries
        while (popCount > 0) {
            System.err.println("popping " + activeIntervals.get(0).name);

            final Interval db = activeIntervals.get(0);
            for (IntervalCounter counter : outgoingIntervals) {
                if (counter.itv.overlaps(db))
                    counter.counter -= 1;
            }

            dbToQueries.remove(db);
            activeIntervals.remove(0);
            popCount--;
        }

        // Write out all queries that are marked as done
        while (!outgoingIntervals.isEmpty() && outgoingIntervals.get(0).counter == 0) {
            System.err.println("Writing out query " + outgoingIntervals.get(0).itv.name);
            outgoingIntervals.remove(0);
        }
    }

    private static void assignToIntervals(Interval q) {
        // Activate new DB intervals
        int popCount = 0;
        for (Interval db : inactiveIntervals) {
            if (q == null || q.end > db.begin) { // could overlap
                activeIntervals.add(db);
                dbToQueries.put(db, new ArrayList<>());
                if (q != null) {
                    outgoingIntervals.add(new IntervalCounter(q));
                    itvToCounter.put(q, outgoingIntervals.get(outgoingIntervals.size() - 1));
                }
                popCount++;
            } else {
                break; // cannot pull in more
            }
        }
        // Activate intervals
        while (popCount > 0) {
            inactiveIntervals.remove(0);
            popCount--;
        }
        // Assign to active DB intervals
        if (q == null)
            return;
        for (Interval db : activeIntervals) {
            if (q.overlaps(db)) {
                dbToQueries.get(db).add(q);
                itvToCounter.get(q).counter += 1;
            }
        }
    }

    public static void main(String[] args) throws java.lang.Exception {
        ArrayList<Interval> db = new ArrayList<>();
        db.add(new Interval("db1", 1, 100));
        db.add(new Interval("db2", 95, 190));
        db.add(new Interval("db3", 200, 300));

        ArrayList<Interval> qry = new ArrayList<>();
        qry.add(new Interval("q1", 1, 20));
        qry.add(new Interval("q2", 99, 100));
        qry.add(new Interval("q3", 250, 251));

        // Guarantee: db and qry will always be sorted by begin

        process(db, qry);
    }
}

Вывод при запуске программы выше выглядит следующим образом

Processing in DB db1
  q1
  q2
Processing in DB db2
  q2
popping db1
popping db2
Writing out query q1
Writing out query q2
Processing in DB db3
  q3
popping db3
Writing out query q3
Автор: Manuel Источник Размещён: 18.07.2016 02:35

Ответы (1)


0 плюса

193 Репутация автора

Эта проблема выглядит аналогично проблеме внешнего объединения потоков, описанной в разделе « Как выполнить внешнее соединение двух или более потоков ». Реализация соединения использует две очереди блокировки. Отдельные потоки заполняют каждую очередь и помещают маркер конца потока в очередь, когда у них заканчиваются данные. Spliterator реализация на основе AbstractSpliterator потребляет значения в каждой очереди по мере необходимости.

В вашем случае требуется другая реализация tryAdvance. Предлагаемая реализация внутреннего потока приведена ниже. См. Вопрос выше для блокирования частей заполнения очереди. Распараллеливание потока затруднено из-за пересекающейся природы записей и запросов БД. Тем не менее, предлагаемая реализация поддерживает ограниченное распараллеливание (см. Реализацию trySplit в AbstractSpliterator ).

Я проверил концепции, используемые ниже, но этот код не проверен.

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.commons.lang3.tuple.Pair;

public class QueryProcessor {
    private static class SpliteratorImpl extends AbstractSpliterator<Pair<DbEntry, Query>> {
        private final Query queryEos;
        private final DbEntry dbEntryEos;
        private final BlockingQueue<DbEntry> dbQueue;
        private final BlockingQueue<Query> queryQueue;
        private final Deque<Query> activeQueries = new ArrayDeque<>();
        private final Deque<Pair<DbEntry, Query>> working = new ArrayDeque<>();
        private boolean queriesExhausted = false;

        public SpliteratorImpl(long est, int additionalCharacteristics, BlockingQueue<DbEntry> dbQueue, BlockingQueue<Query> queryQueue, DbEntry dbEntryEos, Query queryEos) {
            super(est, additionalCharacteristics);
            this.dbQueue = dbQueue;
            this.queryQueue = queryQueue;
            this.queryEos = queryEos;
            this.dbEntryEos = dbEntryEos;
        }

        @Override
        public boolean tryAdvance(Consumer<? super Pair<DbEntry, Query>> action) {
            try {
                DbEntry entry = null;
                // tryAdvance produces DbEntry-Query pairs. It begins by
                // draining a working queue of DbEntry-Query pairs, one pair
                // for each tryAdvance call. If the working queue is empty,
                // tryAdvance takes an entry from the entryQueue. For each
                // of these entries, tryAdvance extends its active query queue
                // with queries potentially overlapping the entry and purges
                // queries at the front of the queue that do not overlap the
                // entry. Next, tryAdvance pairs the current entry with each
                // active query that it overlaps.
                //
                // If there are no more DB entries or no active queries, the
                // loop terminates and returns false, signaling that the stream
                // as reached its end.

                for (;;) {
                    // If the working queue is not empty, consume the first pair
                    // in the queue and return true. If it is empty move on to
                    // the next step.
                    if (!working.isEmpty()) {
                        Pair<DbEntry, Query> p = working.pop();
                        action.accept(p);
                        return true;
                    }

                    // Take the next entry form the DB entry queue.
                    entry = dbQueue.take();
                    if (entry == dbEntryEos) {
                        // Encountered end-of-stream in DB entries -- we're
                        // done.
                        return false;
                    }

                    // Extend the the active query queue with any queries that
                    // potentially overlap the current DB entry. If the end of
                    // the last query proceeds the end of the DB entry by more
                    // than one unit, it is possible that the next query will
                    // overlap with this last uncovered bit of the entry.
                    while (!queriesExhausted
                        && (activeQueries.peekLast() == null || activeQueries.peekLast().end() + 1 < entry.end())) {
                        Query q = queryQueue.take();
                        if (q == queryEos) {
                            queriesExhausted = true;
                        } else if (q.end() > entry.begin()) {
                            // if the end of this q follows the beginning of the
                            // entry, keep the query; otherwise, discard it (it
                            // won't overlap any of the following entries
                            // either).
                            activeQueries.add(q);
                        } else {
                            // Discard queries whose end proceed the beginning
                            // of the entry interval.
                        }
                    }

                    // Pop any queries in the working queue that proceed the
                    // current entry (there was overlap between them and prior
                    // entries).
                    for (;;) {
                        if (activeQueries.peekFirst() != null && activeQueries.peekFirst().end() <= entry.begin()) {
                            activeQueries.pop();
                        } else {
                            break;
                        }
                    }

                    // If the active query queue is empty, there is nothing left
                    // that I can do.
                    if (!activeQueries.isEmpty()) {
                        // Pair each query in the active query queue with the
                        // current entry if it overlaps the current entry.
                        for (Query q : activeQueries) {
                            if (isOverlapping(entry, q)) {
                                working.add(Pair.of(entry, q));
                            }
                        }
                        continue;
                    }
                    // The active query queue is empty; so, return false to
                    // signal the end of the stream.
                    return false;
                }
            } catch (InterruptedException e) {
                return false;
            }
        }

        private static boolean isOverlapping(DbEntry entry, Query q) {
            return (q.begin() <= entry.begin() && entry.begin() < q.end())
                || q.begin() <= entry.end() && entry.end() < q.end();
        }
    }

    private QueryProcessor() {
    }

    public static Stream<Pair<DbEntry, Query>> stream(long est, int additionalCharacteristics,
            BlockingQueue<DbEntry> dbQueue, BlockingQueue<Query> queryQueue, DbEntry dbEntryEos, Query queryEos) {
        Spliterator<Pair<DbEntry, Query>> spliterator =
            new SpliteratorImpl(est, additionalCharacteristics, dbQueue, queryQueue, dbEntryEos, queryEos);

        return StreamSupport.stream(spliterator, false);
    }
}
Автор: John Morris Размещён: 08.09.2016 05:09
Вопросы из категории :
32x32