Spark Structured Streaming с использованием сокетов, установите SCHEMA, отобразите DATAFRAME в консоли
661 просмотра
1 ответ
Как я могу установить схему для потоковой передачи DataFrame
в PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
Например мне нужна таблица как:
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
Как я могу установить заголовок / схему на ['Name', 'lastName', 'PhoneNumber'] с их типами данных.
Кроме того, можно ли отображать эту таблицу непрерывно, или, скажем, верхние 20 строк DataFrame
. Когда я попробовал это, я получаю ошибку
Автор: user3698581 Источник Размещён: 12.11.2019 09:28"pyspark.sql.utils.AnalysisException: 'полный режим вывода не поддерживается, если потоковые агрегаты отсутствуют при потоковой передаче данных. Фреймы данных / наборы данных ;; \ nProject"
Ответы (1)
4 плюса
TextSocketSource
не предоставляет никаких встроенных параметров синтаксического анализа. Можно использовать только один из двух форматов:
метка времени и текст if
includeTimestamp
установленыtrue
со следующей схемой:StructType([ StructField("value", StringType()), StructField("timestamp", TimestampType()) ])
только текст, если
includeTimestamp
он установленfalse
со схемой, как показано ниже:StructType([StructField("value", StringType())]))
Если вы хотите изменить этот формат, вам придется преобразовать поток для извлечения интересующих полей, например, с помощью регулярных выражений:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)
Автор: user6910411
Размещён: 29.12.2016 07:17
Вопросы из категории :
- apache-spark Как сделать акулу / искру очистить кеш?
- apache-spark (PySpark) Вложенные списки после reduByKey
- apache-spark Spark java.lang.OutOfMemoryError: пространство кучи Java
- apache-spark Apache Spark: map vs mapPartitions?
- pyspark импорт pyspark в оболочку python
- pyspark Как отключить ведение журнала INFO в Spark?
- pyspark Преобразование простой строки строки в RDD в Spark
- pyspark Как создать Spark 1.2 с Maven (дает java.io.IOException: не удается запустить программу «javac»)?
- apache-spark-sql Конкатенация строк в SQL-запросе Spark
- apache-spark-sql динамически связывать переменную / параметр в Spark SQL?
- apache-spark-sql Schema from SchemaRDD?
- apache-spark-sql Вставка аналитических данных из Spark в Postgres
- pyspark-sql Как изменить имена столбцов данных в pyspark?
- pyspark-sql Как установить количество разделов / узлов при импорте данных в Spark
- pyspark-sql Усреднение по оконной функции приводит к StackOverflowError
- pyspark-sql UDF Pyspark DataFrame в текстовом столбце
- spark-structured-streaming Consume Kafka in Spark Streaming (Spark 2.0)
- spark-structured-streaming Почему transform создает побочные эффекты (println) только один раз в структурированном потоке?
- spark-structured-streaming Множественные агрегации в Spark Structured Streaming
- spark-structured-streaming Почему приложение Spark терпит неудачу с «ClassNotFoundException: не удалось найти источник данных: kafka» как uber-jar со сборкой sbt?