Spark Structured Streaming с использованием сокетов, установите SCHEMA, отобразите DATAFRAME в консоли

apache-spark pyspark apache-spark-sql pyspark-sql spark-structured-streaming

661 просмотра

1 ответ

Как я могу установить схему для потоковой передачи DataFrameв PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()

Например мне нужна таблица как:

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....

Как я могу установить заголовок / схему на ['Name', 'lastName', 'PhoneNumber'] с их типами данных.

Кроме того, можно ли отображать эту таблицу непрерывно, или, скажем, верхние 20 строк DataFrame. Когда я попробовал это, я получаю ошибку

"pyspark.sql.utils.AnalysisException: 'полный режим вывода не поддерживается, если потоковые агрегаты отсутствуют при потоковой передаче данных. Фреймы данных / наборы данных ;; \ nProject"

Автор: user3698581 Источник Размещён: 12.11.2019 09:28

Ответы (1)


4 плюса

Решение

TextSocketSourceне предоставляет никаких встроенных параметров синтаксического анализа. Можно использовать только один из двух форматов:

  • метка времени и текст if includeTimestampустановлены trueсо следующей схемой:

    StructType([
        StructField("value", StringType()),
        StructField("timestamp", TimestampType())
    ])
    
  • только текст, если includeTimestampон установлен falseсо схемой, как показано ниже:

    StructType([StructField("value", StringType())]))
    

Если вы хотите изменить этот формат, вам придется преобразовать поток для извлечения интересующих полей, например, с помощью регулярных выражений:

from pyspark.sql.functions import regexp_extract
from functools import partial

fields = partial(
    regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)

lines.select(
    fields(idx=1).alias("name"),
    fields(idx=2).alias("last_name"), 
    fields(idx=3).alias("phone_number")
)
Автор: user6910411 Размещён: 29.12.2016 07:17
Вопросы из категории :
32x32