Functions from custom module not working in PySpark, but they work when inputted in interactive mode

16,231

Solution 1

I had the same error and followed the stack trace.

In my case, I was building an Egg file and then passing it to spark via the --py-files option.

Concerning the error, I think it boils down to the fact that when you call F.udf(str2num, t.IntegerType()) a UserDefinedFunction instance is created before Spark is running, so it has an empty reference to some SparkContext, call it sc. When you run the UDF, sc._pickled_broadcast_vars is referenced and this throws the AttributeError in your output.

My work around is to avoid creating the UDF until Spark is running (and hence there is an active SparkContext. In your case, you could just change your definition of

def letConvNum(df):    # df is a PySpark DataFrame
    #Get a list of columns that I want to transform, using the metadata Pandas DataFrame
    chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()

    str2numUDF = F.udf(str2num, t.IntegerType()) # create UDF on demand
    for curcol in chng_cols:
        df=df.withColumn(curcol, str2numUDF(df[curcol]))
    return df

Note: I haven't actually tested the code above, but the change in my own code was similar and everything worked fine.

Also, for the interested reader, see the Spark code for UserDefinedFunction

Solution 2

I think a cleaner solution would be to use the udf decorator to define your udf function :

from pyspark.sql.functions as F

@F.udf
def str2numUDF(text):
    if type(text)==None or text=='' or text=='NULL' or text=='null':
        return 0
    elif len(text)==1:
        return ord(text)
    else:
        newnum=''
        for lettr in text:
            newnum=newnum+str(ord(lettr))
        return int(newnum)

With this solution, the udf does not reference any other function so it won't throw any errors at you.

For some older versions of spark, the decorator doesn't support typed udf some you might have to define a custom decorator as follow :

from pyspark.sql.functions as F
from pyspark.sql.types as t

# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
    def _typed_udf_wrapper(func):
        return F.udf(func, returntype)
    return _typed_udf_wrapper

@udf_typed(t.IntegerType())
def my_udf(x)
    return int(x)

Solution 3

I've been bashing my head on this problem for a solid 20 hours. Thank you for the solution guys!

here is my variant, in-case someone is interested in how I solved the same problem. though its mostly derived from the code/responses from above.

The purpose here is to simply convert the string columns to show their lengths, but you can do anything of course (i do datatype checking and error tracking in my main application).

what I'm using the udf for is far more complicated, however this is what I did to test that my udf was now working.

Assuming your dataframe is all StringType() in my case i had 4 string columns

Solution:

I made a seperate .py file named myfunctions

inside of it

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
import logging

def str2num(text):
    if type(text) == None or text == '' or text == 'NULL' or text == 'null':
        return 0
   else:
        return len(text)


def letConvNum(df, columns):
    str2numUDF = F.udf(str2num, IntegerType())
    logging.info(columns)
    index = 0
    for curcol in columns:
        df = df.withColumn(curcol, str2numUDF(df[curcol]))
        index += 1
    return df

Then inside of my main class add the new .py file to your sparkContext

#my understanding is that this insures your function is added to a spark across all nodes
sc.addPyFile("./myfunctions.py")

#dynamically create headers based on config -simplified for example
schemaString = "YearMonth,IMEI,IMSI,MSISDN"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(",")]
schema = StructType(fields)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='false', delimiter='|').load('/app/teacosy/invictus/kenya/SAF_QUALCOMM_IMEI_20170321.txt', schema=schema)

#read and write file to get parquet. please note this was to optimize MASSIVE files 50-200g
df.write.parquet("data.parquet", mode='overwrite')
dataframe = sqlContext.read.parquet("data.parquet")

df2 = mf.letConvNum(dataframe, schemaString.split(","))
df2.show()

Input:

+---------+---------------+---------------+------------+
|YearMonth|           IMEI|           IMSI|      MSISDN|
+---------+---------------+---------------+------------+
|   201609|869859025975610|639021005869699|254724884336|
|   201609|359521062182040|639021025339132|254721224577|
|   201609|353121070662770|639021025339132|254721224577|
|   201609|868096015837410|639021025339132|254721224577|
|   201609|866204020015610|639021025339132|254721224577|
|   201609|356051060479107|639028040455896|254710404131|
|   201609|353071062803703|639027641207269|254725555262|
|   201609|356899067316490|639027841002602|254711955201|
|   201609|860357020164930|639028550063234|254715570856|
|   201609|862245026673900|639028940332785|254728412070|
|   201609|352441075290910|639029340152407|254714582871|
|   201609|862074027499277|639029340152407|254714582871|
|   201609|357036073532528|639028500408346|254715408346|
|   201609|356546060475230|639021011628783|254722841516|
|   201609|356546060475220|639021011628783|254722841516|
|   201609|866838023727117|639028840277749|254718492024|
|   201609|354210053950950|639029440054836|254729308302|
|   201609|866912020393040|639029870328080|254725528182|
|   201609|357921070054540|639028340694869|254710255083|
|   201609|357977056264767|639027141561199|254721977494|

Output:

+---------+----+----+------+
|YearMonth|IMEI|IMSI|MSISDN|
+---------+----+----+------+
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|
|        6|  15|  15|    12|

I hope this helps anyone struggling with seeing their pyspark applications freeze or hang... so frustrating omg...

Share:
16,231
RKD314
Author by

RKD314

Updated on June 16, 2022

Comments

  • RKD314
    RKD314 almost 2 years

    I have a module that I've written containing functions that act on PySpark DataFrames. They do a transformation on columns in the DataFrame and then return a new DataFrame. Here is an example of the code, shortened to include only one of the functions:

    from pyspark.sql import functions as F
    from pyspark.sql import types as t
    
    import pandas as pd
    import numpy as np
    
    metadta=pd.DataFrame(pd.read_csv("metadata.csv"))  # this contains metadata on my dataset
    
    def str2num(text):
        if type(text)==None or text=='' or text=='NULL' or text=='null':
            return 0
        elif len(text)==1:
            return ord(text)
        else:
            newnum=''
            for lettr in text:
                newnum=newnum+str(ord(lettr))
            return int(newnum)
    
    str2numUDF = F.udf(lambda s: str2num(s), t.IntegerType())
    
    def letConvNum(df):    # df is a PySpark DataFrame
        #Get a list of columns that I want to transform, using the metadata Pandas DataFrame
        chng_cols=metadta[(metadta.comments=='letter conversion to num')].col_name.tolist()
        for curcol in chng_cols:
            df=df.withColumn(curcol, str2numUDF(df[curcol]))
        return df
    

    So that is my module, call it mymodule.py. If I start the PySpark shell, and I do the following:

    import mymodule as mm
    myf=sqlContext.sql("select * from tablename lim 10")
    

    I check myf (PySpark DataFrame) and it is ok. I check that I have actually imported mymodule, by trying to use the str2num function:

    mm.str2num('a')
    97
    

    So it actually is importing the module. Then if I try this:

    df2=mm.letConvNum(df)
    

    And do this to check that it worked:

    df2.show()
    

    It tries to perform the action, but then it crashes:

        16/03/10 16:10:44 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 365)
        org.apache.spark.api.python.PythonException: Traceback (most recent call last):
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
            command = pickleSer._read_with_length(infile)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
            return self.loads(obj)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
            return pickle.loads(obj)
          File "test2.py", line 16, in <module>
            str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
            return UserDefinedFunction(f, returnType)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
            self._judf = self._create_judf(name)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
            pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
            [x._jbroadcast for x in sc._pickled_broadcast_vars],
        AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
    
                at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
                at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
                at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:88)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
        16/03/10 16:10:44 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
        Traceback (most recent call last):
          File "<stdin>", line 1, in <module>
          File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/dataframe.py", line 256, in show
            print(self._jdf.showString(n, truncate))
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
          File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco
            return f(*a, **kw)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
        py4j.protocol.Py4JJavaError: An error occurred while calling o7299.showString.
        : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 365, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
            command = pickleSer._read_with_length(infile)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
            return self.loads(obj)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
            return pickle.loads(obj)
          File "test2.py", line 16, in <module>
            str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
            return UserDefinedFunction(f, returnType)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
            self._judf = self._create_judf(name)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
            pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
            [x._jbroadcast for x in sc._pickled_broadcast_vars],
        AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
    
                at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
                at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
                at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:88)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
    
        Driver stacktrace:
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
                at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
                at scala.Option.foreach(Option.scala:236)
                at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
                at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
                at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
                at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
                at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
                at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
                at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
                at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
                at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
                at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
                at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
                at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
                at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:497)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
                at py4j.Gateway.invoke(Gateway.java:259)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:207)
                at java.lang.Thread.run(Thread.java:745)
        Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
            command = pickleSer._read_with_length(infile)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
            return self.loads(obj)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
            return pickle.loads(obj)
          File "test2.py", line 16, in <module>
            str2numUDF=F.udf(lambda s: str2num(s), t.IntegerType())
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1460, in udf
            return UserDefinedFunction(f, returnType)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1422, in __init__
            self._judf = self._create_judf(name)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1430, in _create_judf
            pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
          File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2317, in _prepare_for_python_RDD
            [x._jbroadcast for x in sc._pickled_broadcast_vars],
        AttributeError: 'NoneType' object has no attribute '_pickled_broadcast_vars'
    
                at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
                at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
                at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:397)
                at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:362)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:710)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:88)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                ... 1 more
    

    As a check, I opened a clean shell and instead of importing the module, I just defined the str2num function and the UDF in the interactive shell. I then typed in the contents of the last function, and did the same final check:

    df2.show()
    

    This time, I get back the transformed DataFrame I was expecting.

    Why does it work when the functions are inputted interactively but not when they are read in from a module? I know it is reading the module, as the regular function str2num works.

  • RKD314
    RKD314 over 7 years
    Yes, that's true. It doesn't solve the problem though (I had tried the solution you propose). I'm no longer trying to solve this, but I was on Spark 1.6.
  • Dalibor Frivaldsky
    Dalibor Frivaldsky over 6 years
    Instead of creating a class, I've ended up writing a lambda for each of my UDFs - myfunc_udf = lambda arg: udf(f=myfunc, returnType=StringType())(arg)