How to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method in Pyspark?

24,768

Solution 1

You want to do two things here: 1. flatten your data 2. put it into a dataframe

One way to do it is as follows:

First, let us flatten the dictionary:

rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])

When collecting the data, you get something like this:

[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...

Then we can format the data and turn it into a dataframe:

rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
    .toDF(("CId", "IID", "Score"))\
    .show()

which gives you this:

+---+---+-------------------+
|CId|IID|              Score|
+---+---+-------------------+
| 10|  3|  3.616726727464709|
| 10|  4| 2.9996439803387602|
| 10|  5| 1.6767412921625855|
|  1|  3|  2.016527311459324|
|  1|  4|-1.5271512313750577|
|  1|  5| 1.9665475696370045|
|  2|  3|  6.230272144805092|
|  2|  4|  4.033642544526678|
|  2|  5| 3.1517805604906313|
|  3|  3|-0.3924680103722977|
|  3|  4| 2.9757316477407443|
|  3|  5|-1.5689126834176417|
+---+---+-------------------+

Solution 2

There is an even easier and more elegant solution avoiding python lambda-expressions as in @oli answer which relies on spark DataFrames's explode which perfectly fits your requirement. It should be faster too because there is no need to use python lambda's twice. See below:

from pyspark.sql.functions import explode

# dummy data
data = [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
        (1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
        (2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
        (3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]

# create your rdd
rdd = sc.parallelize(data)

# convert to spark data frame
df = rdd.toDF(["CId", "Values"])

# use explode
df.select("CId", explode("Values").alias("IID", "Score")).show()

+---+---+-------------------+
|CId|IID|              Score|
+---+---+-------------------+
| 10|  3|  3.616726727464709|
| 10|  4| 2.9996439803387602|
| 10|  5| 1.6767412921625855|
|  1|  3|  2.016527311459324|
|  1|  4|-1.5271512313750577|
|  1|  5| 1.9665475696370045|
|  2|  3|  6.230272144805092|
|  2|  4|  4.033642544526678|
|  2|  5| 3.1517805604906313|
|  3|  3|-0.3924680103722977|
|  3|  4| 2.9757316477407443|
|  3|  5|-1.5689126834176417|
+---+---+-------------------+

Solution 3

Ensure a spark session is created first:

sc = SparkContext()
spark = SparkSession(sc)

I found this answer when I was trying to solve this exact issue.
'PipelinedRDD' object has no attribute 'toDF' in PySpark

Solution 4

This is how you can do it with scala

  val Rdd1 = spark.sparkContext.parallelize(Seq(
    (10, Map(3 -> 3.616726727464709, 4 -> 2.9996439803387602, 5 -> 1.6767412921625855)),
    (1, Map(3 -> 2.016527311459324, 4 -> -1.5271512313750577, 5 -> 1.9665475696370045)),
    (2, Map(3 -> 6.230272144805092, 4 -> 4.033642544526678, 5 -> 3.1517805604906313)),
    (3, Map(3 -> -0.3924680103722977, 4 -> 2.9757316477407443, 5 -> -1.5689126834176417))
  ))

  val x = Rdd1.flatMap(x => (x._2.map(y => (x._1, y._1, y._2))))
         .toDF("CId", "IId", "score")

Output:

+---+---+-------------------+
|CId|IId|score              |
+---+---+-------------------+
|10 |3  |3.616726727464709  |
|10 |4  |2.9996439803387602 |
|10 |5  |1.6767412921625855 |
|1  |3  |2.016527311459324  |
|1  |4  |-1.5271512313750577|
|1  |5  |1.9665475696370045 |
|2  |3  |6.230272144805092  |
|2  |4  |4.033642544526678  |
|2  |5  |3.1517805604906313 |
|3  |3  |-0.3924680103722977|
|3  |4  |2.9757316477407443 |
|3  |5  |-1.5689126834176417|
+---+---+-------------------+ 

Hope you can convert to pyspark.

Share:
24,768
Sai
Author by

Sai

Updated on January 04, 2022

Comments

  • Sai
    Sai over 2 years

    I have pyspark.rdd.PipelinedRDD (Rdd1). when I am doing Rdd1.collect(),it is giving result like below.

     [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
     (1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
     (2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
     (3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
    

    Now I want to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method

    My final data frame should be like below. df.show() should be like:

    +----------+-------+-------------------+
    |CId       |IID    |Score              |
    +----------+-------+-------------------+
    |10        |4      |2.9996439803387602 |
    |10        |5      |1.6767412921625855 |
    |10        |3      |3.616726727464709  |
    |1         |4      |-1.5271512313750577|
    |1         |5      |1.9665475696370045 |
    |1         |3      |2.016527311459324  |
    |2         |4      |4.033642544526678  |
    |2         |5      |3.1517805604906313 |
    |2         |3      |6.230272144805092  |
    |3         |4      |2.9757316477407443 |
    |3         |5      |-1.5689126834176417|
    |3         |3      |-0.3924680103722977|
    +----------+-------+-------------------+
    

    I can achieve this converting to rdd next applying collect, iteration and finally Data frame.

    but now I want to convert pyspark.rdd.PipelinedRDD to Dataframe with out using any collect() method.

    please let me know how to achieve this?