Broadcast a dictionary to rdd in PySpark
Solution 1
You forgot something important about Broadcast
objects, they have a property called value where the data is stored.
Therefore you need to modify my_func
to something like this:
my_dict_bc = sc.broadcast(my_dict)
def my_func(letter):
return my_dict_bc.value[letter]
Solution 2
The proper way to do it depends on how the read-only shared variables (the dictionary in your case) will be accessed in the rest of the program. In the case you described, you don't need to use a broadcast variable. From the Spark programming guide section on broadcast variables:
Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
In your case, if the data is only needed in the single map stage, there is no need to explicitly broadcast the variable (it is not "useful"). However, if the same dictionary were to be used later in another stage, then you might wish to use broadcast to avoid serializing and deserializing the dictionary before each stage.
Related videos on Youtube
Akavall
I like programming, machine learning, statistics, all kinds of problem solving, and I play chess. My github
Updated on June 04, 2022Comments
-
Akavall over 1 year
I am just getting the hang of Spark, and I have function that needs to be mapped to an
rdd
, but uses a global dictionary:from pyspark import SparkContext sc = SparkContext('local[*]', 'pyspark') my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified my_list = ["a", "d", "c", "b"] def my_func(letter): return my_dict[letter] my_list_rdd = sc.parallelize(my_list) result = my_list_rdd.map(lambda x: my_func(x)).collect() print result
The above gives the expected result; however, I am really not sure about my use of the global variable
my_dict
. It seems that a copy of the dictionary is made with every partition. And it just does not feel right..It looked like broadcast is what I am looking for. However, when I try to use it:
my_dict_bc = sc.broadcast(my_dict) def my_func(letter): return my_dict_bc[letter]
I get the following error:
TypeError: 'Broadcast' object has no attribute '__getitem__
This seems to imply that I cannot broadcast a dictionary.
My question: If I have a function that uses a global dictionary, that needs to be mapped to
rdd
, what is the proper way to do it?My example is very simple, but in reality
my_dict
andmy_list
are much larger, andmy_func
is more complicated. -
Akavall almost 8 yearsI see!, So when I add
my_dict_bc.value
it works correctly. Andbroadcasting
is a standard approach to work with objects that are being shared, right? -
Alberto Bonsanto almost 8 yearsYes, it is a good practice, however if the dictionary isn't too large, then you could use a global object without any problem
-
Akavall almost 8 yearsMakes sense. Thank You.
-
Denis Kuzin over 6 years@AlbertoBonsanto, may I ask you, how to work with a large dictionary?
-
Alberto Bonsanto over 6 years@DenisKuzin How large? If it's huge convert it to a DataFrame and use join.
-
Denis Kuzin over 6 years@AlbertoBonsanto, dictionary size is about 30Gb. Ok, thanks, good idea