Pyspark limitByKey на вложенных кортежах

python pyspark rdd reduce

590 просмотра

1 ответ

Мой вопрос похож на PySpark reduByKey для нескольких значений, но с каким-то критическим отличием. Я новичок в PySpark, поэтому я наверняка пропускаю что-то очевидное.

У меня есть RDD со следующей структурой:

(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))

То, что я хочу в качестве вывода, что-то вроде

(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)

Это похоже на идеальный случай для использования, reduceByKeyи сначала я подумал о чем-то вроде

rdd.reduceByKey(lambda x,y: x[1]+y[1])

Что дает мне именно тот RDD, с которого я начал. Я полагаю, что с индексированием что-то не так, поскольку есть вложенные кортежи, но я перепробовал каждую возможную комбинацию индексов, о которой мог подумать, и она продолжает возвращать мне исходное СДР.

Может быть, есть причина, по которой он не должен работать с вложенными кортежами или я делаю что-то не так?

Автор: Violetta Zoffoli Источник Размещён: 08.11.2019 11:20

Ответы (1)


1 плюс

Решение

Вы не должны использовать reduceByKeyздесь вообще. Требуется ассоциативная и коммутативная функция с сигнатурой. (T, T) => T, Должно быть очевидно, что это не применимо, когда вы используете List[Tuple[U, T]]в качестве входных данных и ожидаете Tв качестве выходных данных.

Поскольку не совсем ясно, являются ли ключи уникальными или нет, давайте рассмотрим общий пример, когда мы должны агрегировать как локально, так и глобально. Предположим, что v01, v02... vmпростые цифры:

from functools import reduce
from operator import add

def agg_(xs):
    # For numeric values sum would be more idiomatic
    # but lets make it more generic
    return reduce(add, (x[1] for x in xs), zero_value)

zero_value = 0
merge_op = add
def seq_op(acc, xs):
    return acc + agg_(xs)

rdd = sc.parallelize([
    ("K0", (("k01", 3), ("k02", 2))),
    ("K0", (("k03", 5), ("k04", 6))),
    ("K1", (("k11", 0), ("k12", -1)))])

rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]

Если ключи уже уникальны, то простого mapValuesбудет достаточно:

from itertools import chain

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]
Автор: zero323 Размещён: 20.08.2016 04:03
Вопросы из категории :
32x32