How to display a streaming DataFrame (as show fails with AnalysisException)?

13,537

Streaming DataFrame doesn't support the show() method. When you call start() method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show().

Remove readDF.show() and add a sleep after that, then you should be able to see data in the console, such as

query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()

You also need to set startingOffsets to earliest, otherwise, Kafka source will just start from the latest offset and fetch nothing in your case.

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic_name) \
    .load()
Share:
13,537
user2361174
Author by

user2361174

Updated on June 20, 2022

Comments

  • user2361174
    user2361174 almost 2 years

    So I have some data I'm stream in a Kafka topic, I'm taking this streaming data and placing it into a DataFrame. I want to display the data inside of the DataFrame:

    import os
    from kafka import KafkaProducer
    from pyspark.sql import SparkSession, DataFrame
    import time
    from datetime import datetime, timedelta
    
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'
    
    topic_name = "my-topic"
    kafka_broker = "localhost:9092"
    
    producer = KafkaProducer(bootstrap_servers = kafka_broker)
    spark = SparkSession.builder.getOrCreate()
    terminate = datetime.now() + timedelta(seconds=30)
    
    while datetime.now() < terminate:
        producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
        time.sleep(1)
    
    readDF = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_broker) \
        .option("subscribe", topic_name) \
        .load()
    readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
    
    readDF.writeStream.format("console").start()
    readDF.show()
    
    producer.close()
    

    However I keep on getting this error:

    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
    : org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
    kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    ...
    Traceback (most recent call last):
          File "test2.py", line 30, in <module>
            readDF.show()
          File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
            print(self._jdf.showString(n, 20))
          File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
          File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
            raise AnalysisException(s.split(': ', 1)[1], stackTrace)
        pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
    

    I don't understand why the exception is happening, I'm calling writeStream.start() right before show(). I tried getting rid of selectExpr() but that made no difference. Does anyone know how to display a stream sourced DataFrame? I'm using Python 3.6.1, Kafka 0.10.2.1, and Spark 2.2.0