Вопрос:

PySpark - добавление столбца из списка значений с использованием UDF

python list pyspark apache-spark-sql user-defined-functions

6103 просмотра

5 ответа

865 Репутация автора

Я должен добавить столбец в фрейм данных PySpark на основе списка значений.

a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

У меня есть список, который называется рейтинг, который является рейтингом каждого питомца.

rating = [5,4,1]

Мне нужно добавить фрейм данных в столбец с именем Rating, чтобы

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

Я сделал следующее, однако он возвращает только первое значение в списке в столбце рейтинга.

def add_labels():
    return rating.pop(0)

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf()).cache()

из:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     5|
| Mouse|  Cat|     5|
+------+-----+------+
Автор: Bryce Ramgovind Источник Размещён: 09.01.2018 08:39

Ответы (5)


0 плюса

923 Репутация автора

То, что вы пытаетесь сделать, не работает, потому что ratingсписок находится в памяти вашего драйвера, в то время как aфрейм данных находится в памяти исполнителя (udf работает и на исполнителях).

Что вам нужно сделать, это добавить ключи в ratingsсписок, например так:

ratings = [('Dog', 5), ('Cat', 4), ('Mouse', 1)]

Затем вы создаете ratingsфрейм данных из списка и присоединяетесь к обоим, чтобы добавить новый столбец:

ratings_df = spark.createDataFrame(ratings, ['Animal', 'Rating'])
new_df = a.join(ratings_df, 'Animal')
Автор: Tw UxTLi51Nus Размещён: 09.01.2018 09:06

2 плюса

1631 Репутация автора

Как уже упоминалось @Tw UxTLi51Nus, если вы можете заказать DataFrame, скажем, по Animal, без изменения ваших результатов, вы можете сделать следующее:

def add_labels(indx):
    return rating[indx-1] # since row num begins from 1
labels_udf = udf(add_labels, IntegerType())

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a.createOrReplaceTempView('a')
a = spark.sql('select row_number() over (order by "Animal") as num, * from a')

a.show()


+---+------+-----+
|num|Animal|Enemy|
+---+------+-----+
|  1|   Dog|  Cat|
|  2|   Cat|  Dog|
|  3| Mouse|  Cat|
+---+------+-----+

new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---+------+-----+------+
|num|Animal|Enemy|Rating|
+---+------+-----+------+
|  1|   Dog|  Cat|     5|
|  2|   Cat|  Dog|     4|
|  3| Mouse|  Cat|     1|
+---+------+-----+------+

А затем опустите numстолбец:

new_df.drop('num').show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+

Редактировать:

Другой, но, возможно, уродливый и немного неэффективный способ, если вы не можете отсортировать по столбцу, это вернуться к rdd и сделать следующее:

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

# or create the rdd from the start:
# a = spark.sparkContext.parallelize([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")])

a = a.rdd.zipWithIndex()
a = a.toDF()
a.show()

+-----------+---+
|         _1| _2|
+-----------+---+
|  [Dog,Cat]|  0|
|  [Cat,Dog]|  1|
|[Mouse,Cat]|  2|
+-----------+---+

a = a.select(bb._1.getItem('Animal').alias('Animal'), bb._1.getItem('Enemy').alias('Enemy'), bb._2.alias('num'))

def add_labels(indx):
    return rating[indx] # indx here will start from zero

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf('num'))

new_df.show()

+---------+--------+---+------+
|Animal   |   Enemy|num|Rating|
+---------+--------+---+------+
|      Dog|     Cat|  0|     5|
|      Cat|     Dog|  1|     4|
|    Mouse|     Cat|  2|     1|
+---------+--------+---+------+

(Я не рекомендовал бы это, если у вас есть много данных)

Надеюсь это поможет. Удачи!

Автор: mkaran Размещён: 09.01.2018 10:01

1 плюс

29376 Репутация автора

Вы можете конвертировать свой рейтинг в rdd

rating = [5,4,1]
ratingrdd = sc.parallelize(rating)

А затем конвертируйте dataframeв rdd, прикрепите каждое значение ratingrddк rdd-кадру с помощью zipи снова конвертируйте сжатый rdd вdataframe

sqlContext.createDataFrame(a.rdd.zip(ratingrdd).map(lambda x: (x[0][0], x[0][1], x[1])), ["Animal", "Enemy", "Rating"]).show()

Это должно дать вам

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Автор: Ramesh Maharjan Размещён: 09.01.2018 05:21

4 плюса

9452 Репутация автора

Решение

Надеюсь это поможет!

from pyspark.sql.functions import monotonically_increasing_id

#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])
a.show()

#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])

#join both dataframe to get the final result
a = a.withColumn("row_idx", monotonically_increasing_id())
b = b.withColumn("row_idx", monotonically_increasing_id())
final_df = a.join(b, a.row_idx == b.row_idx).\
             drop("row_idx")
final_df.show()

Входные данные:

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+

Выход:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Cat|  Dog|     4|
|   Dog|  Cat|     5|
| Mouse|  Cat|     1|
+------+-----+------+
Автор: Prem Размещён: 09.01.2018 06:33

0 плюса

41 Репутация автора

Я могу ошибаться, но я верю, что принятый ответ не сработает. monotonically_increasing_idтолько гарантирует, что идентификаторы будут уникальными и растущими, а не последовательными. Следовательно, использование его на двух разных фреймах данных, скорее всего, создаст два совершенно разных столбца, и соединение в основном вернет пустое.

черпая вдохновение из этого ответа https://stackoverflow.com/a/48211877/7225303 на аналогичный вопрос, мы можем изменить неправильный ответ на:

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])

a.show()

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+



#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
b.show()

+------+
|Rating|
+------+
|     5|
|     4|
|     1|
+------+


a = a.withColumn("idx", F.monotonically_increasing_id())
b = b.withColumn("idx", F.monotonically_increasing_id())

windowSpec = W.orderBy("idx")
a = a.withColumn("idx", F.row_number().over(windowSpec))
b = b.withColumn("idx", F.row_number().over(windowSpec))

a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  1|
|   Cat|  Dog|  2|
| Mouse|  Cat|  3|
+------+-----+---+

b.show()
+------+---+
|Rating|idx|
+------+---+
|     5|  1|
|     4|  2|
|     1|  3|
+------+---+

final_df = a.join(b, a.idx == b.idx).drop("idx")

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Автор: Biggus Размещён: 05.05.2019 05:16
Вопросы из категории :
32x32