adding a unique consecutive row number to dataframe in pyspark

14,052

Solution 1

I have found a solution and it's very simple. since I have no column in my dataframe which is having same value across all the rows, so using row_number is not generating unique row numbers when using it with partitionBy clause.

Lets add a new column to the existing dataframe with some default value in it.

emp_df= emp_df.withColumn("new_column",lit("ABC"))

and create a window function with paritionBy using that column "new_column"

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

you will get the desired results:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+

Solution 2

Using Spark SQL:

df = spark.sql("""
SELECT 
    row_number() OVER (
        PARTITION BY '' 
        ORDER BY '' 
    ) as id,
    *
FROM 
    VALUES 
    ('Bob  ', 20),
    ('Alice', 21),
    ('Gary ', 21),
    ('Kent ', 25),
    ('Gary ', 35)
""")

Output:

>>> df.printSchema()
root
 |-- id: integer (nullable = true)
 |-- col1: string (nullable = false)
 |-- col2: integer (nullable = false)

>>> df.show()
+---+-----+----+
| id| col1|col2|
+---+-----+----+
|  1|Bob  |  20|
|  2|Alice|  21|
|  3|Gary |  21|
|  4|Kent |  25|
|  5|Gary |  35|
+---+-----+----+
Share:
14,052

Related videos on Youtube

vikrant rana
Author by

vikrant rana

"Eagles crash hard but they fly higher"

Updated on June 04, 2022

Comments

  • vikrant rana
    vikrant rana almost 2 years

    I want to add the unique row number to my dataframe in pyspark and dont want to use monotonicallyIncreasingId & partitionBy methods. I think that this question might be a duplicate of similar questions asked earlier, still looking for some advice whether I am doing it right way or not. following is snippet of my code: I have a csv file with below set of input records:

    1,VIKRANT SINGH RANA    ,NOIDA   ,10000
    3,GOVIND NIMBHAL        ,DWARKA  ,92000
    2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
    4,ABHIJAN SINHA         ,SAKET   ,65000
    5,SUPER DEVELOPER       ,USA     ,50000
    6,RAJAT TYAGI           ,UP      ,65000
    7,AJAY SHARMA           ,NOIDA   ,70000
    8,SIDDHARTH BASU        ,SAKET   ,72000
    9,ROBERT                ,GURGAON ,70000
    

    and I have loaded this csv file into a dataframe.

    PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"
    
    emp_df = spark.read.format("com.databricks.spark.csv") \
      .option("mode", "DROPMALFORMED") \
      .option("header", "true") \
      .option("inferschema", "true") \
      .option("delimiter", ",").load(PATH_TO_FILE)
    
    +------+--------------------+--------+----------+
    |emp_id|            emp_name|emp_city|emp_salary|
    +------+--------------------+--------+----------+
    |     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
    |     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
    |     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
    |     4|ABHIJAN SINHA    ...|SAKET   |     65000|
    |     5|SUPER DEVELOPER  ...|USA     |     50000|
    |     6|RAJAT TYAGI      ...|UP      |     65000|
    |     7|AJAY SHARMA      ...|NOIDA   |     70000|
    |     8|SIDDHARTH BASU   ...|SAKET   |     72000|
    |     9|ROBERT           ...|GURGAON |     70000|
    +------+--------------------+--------+----------+
    
    empRDD = emp_df.rdd.zipWithIndex()
    newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
     newRDD.take(2);
    [[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]
    

    when I included the int value to my list, I have lost the dataframe schema.

    newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
    newdf.show();
    
    +------+--------------------+--------+----------+------+
    |emp_id|            emp_name|emp_city|emp_salary|row_id|
    +------+--------------------+--------+----------+------+
    |     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
    |     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
    |     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
    |     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
    |     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
    |     6|RAJAT TYAGI      ...|UP      |     65000|     5|
    |     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
    |     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
    |     9|ROBERT           ...|GURGAON |     70000|     8|
    +------+--------------------+--------+----------+------+
    

    Am I doing it right way? or is there any better way to add or preserve the schema of dataframe in pyspark?

    Is it feasible to use zipWithIndex method to add unique consecutive row number for large size dataframe also? Can we use this row_id to re-partition the dataframe to uniformly distribute the data across the partitions?

    • Willy
      Willy over 5 years
      What exactly do you mean by lost schema? Did it start as not integer columns but then moved to strings? Also why do you not want to use monotonically increasing ids?
    • vikrant rana
      vikrant rana over 5 years
      Monotonically increasing is not adding consecutive increment I'd.. it's just adding random unique number to my dataframe.. and partitionby with window function is bringing n partition data into one partition
    • vikrant rana
      vikrant rana over 5 years
      Lost schema I meant that while converting rdd to dataframe.. I have to specify the column name as well. Is there any way to preserve the schema of rdd when converting it to rdd and then from rdd to dataframe.
    • pvy4917
      pvy4917 over 5 years
      Do a row_number over a partition set and then orderBy your choice. Why do you want to use RDD? After all DataFrames/DataSets are broken down to RDD, does not mean you need to use it. Avoid RDDs at any cost. And, are you losing the order or schema? I am sure it is the order not schema.
    • vikrant rana
      vikrant rana over 5 years
      I think I can use partitionby clause with window function instead of only using order by.. this way data will not move to single partition.. I will give a try on this.
    • mkaran
      mkaran almost 3 years
      Hi, you can check of all the ways to do this, along with the dangers here, from this answer here
  • zhao yufei
    zhao yufei over 4 years
    simpler way: withColumn("index",F.row_number().over(Window.orderBy(monoto‌​nically_increasing_i‌​d()))-1)
  • Piko Monde
    Piko Monde almost 4 years
    @zhaoyufei why do you need to add -1 in the last part?
  • zhao yufei
    zhao yufei almost 4 years
    @PikoMonde for my usage is to generate a index that range from 0 to some number, so i add -1. if your use case is from 1, you can just remove -1 part, that ok.
  • MiloMinderbinder
    MiloMinderbinder about 2 years
    These solutions move the dataset to a single partition. That mostly would be something you really want to avoid