Unable to create array literal in spark/pyspark
Solution 1
First of all you probably want struct
not arrays
. Remember that Spark SQL doesn't support heterogeneous arrays so array(1, 'a')
is casted to array<string>
.
So query could look like this:
choices = [(100, 'A'), (200, 'B')]
target = [
struct(
lit(number).alias("number").cast("long"),
lit(letter).alias("letter").cast("string"))
for number, letter in choices]
query = struct("number", "letter").isin(target)
This seems to generate valid expression:
query
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'>
But for some reason fails on analyzer:
df.where(~query)
AnalysisException Traceback (most recent call last)
...
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n"
Strangely enough with SQL following fails as well:
df.createOrReplaceTempView("df")
spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))")
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n +- SubqueryAlias df\n +- LogicalRDD [number#0L, letter#1, id#2L]\n"
but when replaced with literals on both sides:
spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))")
DataFrame[number: bigint, letter: string, id: bigint]
works fine so it looks like a bug.
That being said left anti join should work just fine here:
from pyspark.sql.functions import broadcast
df.join(
broadcast(spark.createDataFrame(choices, ("number", "letter"))),
["number", "letter"],
"leftanti"
)
+------+------+---+
|number|letter| id|
+------+------+---+
| 300| C|306|
+------+------+---+
Solution 2
To create an array literal in spark you need to create an array from a series of columns, where a column is created from the lit
function:
scala> array(lit(100), lit("A"))
res1: org.apache.spark.sql.Column = array(100, A)
Mariusz
Updated on January 07, 2020Comments
-
Mariusz over 4 years
I'm in trouble trying to remove rows from dataframe based on two-column list of items to filter. For example for this dataframe:
df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id']) df.show() +------+------+---+ |number|letter| id| +------+------+---+ | 100| A|304| | 200| B|305| | 300| C|306| +------+------+---+
I can easily remove rows using
isin
on one column:df.where(~col('number').isin([100, 200])).show() +------+------+---+ |number|letter| id| +------+------+---+ | 300| C|306| +------+------+---+
But when I try to remove them by two columns I get an exception:
df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show() Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit. : java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A] at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57) at org.apache.spark.sql.functions$.lit(functions.scala:101) at org.apache.spark.sql.functions.lit(functions.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
After some investigation I realized that root cause of the problem is creating literals from non-primitive types. I tried the following code in pyspark:
lit((100, 'A')) lit([100, 'A'])
and the following in scala-spark:
lit((100, "A")) lit(List(100, "A")) lit(Seq(100, "A")) lit(Array(100, "A"))
but with no luck... Does anyone know the way to create array literal in spark/pyspark? Or is there another method to filter dataframe by two columns?
-
TayTay almost 6 yearsThe question was about pyspark, not scala.
-
emeth about 5 yearsA hint in this answer!
from pyspark.sql.functions import *; array(lit(100), lit("A"))
works