How to display a streaming DataFrame (as show fails with AnalysisException)?
Streaming DataFrame doesn't support the show()
method. When you call start()
method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show()
.
Remove readDF.show()
and add a sleep after that, then you should be able to see data in the console, such as
query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
You also need to set startingOffsets
to earliest
, otherwise, Kafka source will just start from the latest offset and fetch nothing in your case.
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic_name) \
.load()
user2361174
Updated on June 20, 2022Comments
-
user2361174 almost 2 years
So I have some data I'm stream in a Kafka topic, I'm taking this streaming data and placing it into a
DataFrame
. I want to display the data inside of the DataFrame:import os from kafka import KafkaProducer from pyspark.sql import SparkSession, DataFrame import time from datetime import datetime, timedelta os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell' topic_name = "my-topic" kafka_broker = "localhost:9092" producer = KafkaProducer(bootstrap_servers = kafka_broker) spark = SparkSession.builder.getOrCreate() terminate = datetime.now() + timedelta(seconds=30) while datetime.now() < terminate: producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8')) time.sleep(1) readDF = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_broker) \ .option("subscribe", topic_name) \ .load() readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)") readDF.writeStream.format("console").start() readDF.show() producer.close()
However I keep on getting this error:
During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString. : org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) ... Traceback (most recent call last): File "test2.py", line 30, in <module> readDF.show() File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show print(self._jdf.showString(n, 20)) File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
I don't understand why the exception is happening, I'm calling
writeStream.start()
right beforeshow()
. I tried getting rid ofselectExpr()
but that made no difference. Does anyone know how to display a stream sourced DataFrame? I'm using Python 3.6.1, Kafka 0.10.2.1, and Spark 2.2.0