Unable to create array literal in spark/pyspark

20,206

Solution 1

First of all you probably want struct not arrays. Remember that Spark SQL doesn't support heterogeneous arrays so array(1, 'a') is casted to array<string>.

So query could look like this:

choices = [(100, 'A'), (200, 'B')]

target = [
    struct(
        lit(number).alias("number").cast("long"), 
        lit(letter).alias("letter").cast("string")) 
    for number, letter  in choices]

query = struct("number", "letter").isin(target)

This seems to generate valid expression:

query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>

But for some reason fails on analyzer:

df.where(~query)
AnalysisException                         Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"

Strangely enough with SQL following fails as well:

df.createOrReplaceTempView("df")

spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n   +- SubqueryAlias df\n      +- LogicalRDD [number#0L, letter#1, id#2L]\n"

but when replaced with literals on both sides:

spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]

works fine so it looks like a bug.

That being said left anti join should work just fine here:

from pyspark.sql.functions import broadcast

df.join(
    broadcast(spark.createDataFrame(choices, ("number", "letter"))), 
    ["number", "letter"],
    "leftanti"
 )
+------+------+---+
|number|letter| id|
+------+------+---+
|   300|     C|306|
+------+------+---+

Solution 2

To create an array literal in spark you need to create an array from a series of columns, where a column is created from the lit function:

scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)
Share:
20,206
Mariusz
Author by

Mariusz

Updated on January 07, 2020

Comments

  • Mariusz
    Mariusz over 4 years

    I'm in trouble trying to remove rows from dataframe based on two-column list of items to filter. For example for this dataframe:

    df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id'])
    df.show()
    +------+------+---+
    |number|letter| id|
    +------+------+---+
    |   100|     A|304|
    |   200|     B|305|
    |   300|     C|306|
    +------+------+---+
    

    I can easily remove rows using isin on one column:

    df.where(~col('number').isin([100, 200])).show()
    +------+------+---+
    |number|letter| id|
    +------+------+---+
    |   300|     C|306|
    +------+------+---+
    

    But when I try to remove them by two columns I get an exception:

    df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show()
    
    Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
    : java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A]
        at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
        at org.apache.spark.sql.functions$.lit(functions.scala:101)
        at org.apache.spark.sql.functions.lit(functions.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
    

    After some investigation I realized that root cause of the problem is creating literals from non-primitive types. I tried the following code in pyspark:

    lit((100, 'A'))
    lit([100, 'A'])
    

    and the following in scala-spark:

    lit((100, "A"))
    lit(List(100, "A"))
    lit(Seq(100, "A"))
    lit(Array(100, "A"))
    

    but with no luck... Does anyone know the way to create array literal in spark/pyspark? Or is there another method to filter dataframe by two columns?

  • TayTay
    TayTay almost 6 years
    The question was about pyspark, not scala.
  • emeth
    emeth about 5 years
    A hint in this answer! from pyspark.sql.functions import *; array(lit(100), lit("A")) works