Broadcast a dictionary to rdd in PySpark

19,554

Solution 1

You forgot something important about Broadcast objects, they have a property called value where the data is stored.

Therefore you need to modify my_func to something like this:

my_dict_bc = sc.broadcast(my_dict)

def my_func(letter):
    return my_dict_bc.value[letter] 

Solution 2

The proper way to do it depends on how the read-only shared variables (the dictionary in your case) will be accessed in the rest of the program. In the case you described, you don't need to use a broadcast variable. From the Spark programming guide section on broadcast variables:

Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

In your case, if the data is only needed in the single map stage, there is no need to explicitly broadcast the variable (it is not "useful"). However, if the same dictionary were to be used later in another stage, then you might wish to use broadcast to avoid serializing and deserializing the dictionary before each stage.

Share:
19,554

Related videos on Youtube

Akavall
Author by

Akavall

I like programming, machine learning, statistics, all kinds of problem solving, and I play chess. My github

Updated on June 04, 2022

Comments

  • Akavall
    Akavall over 1 year

    I am just getting the hang of Spark, and I have function that needs to be mapped to an rdd, but uses a global dictionary:

    from pyspark import SparkContext
    
    sc = SparkContext('local[*]', 'pyspark')
    
    my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified
    my_list = ["a", "d", "c", "b"]
    
    def my_func(letter):
        return my_dict[letter]
    
    my_list_rdd = sc.parallelize(my_list)
    
    result = my_list_rdd.map(lambda x: my_func(x)).collect()
    
    print result
    

    The above gives the expected result; however, I am really not sure about my use of the global variable my_dict. It seems that a copy of the dictionary is made with every partition. And it just does not feel right..

    It looked like broadcast is what I am looking for. However, when I try to use it:

    my_dict_bc = sc.broadcast(my_dict)
    
    def my_func(letter):
        return my_dict_bc[letter] 
    

    I get the following error:

    TypeError: 'Broadcast' object has no attribute '__getitem__
    

    This seems to imply that I cannot broadcast a dictionary.

    My question: If I have a function that uses a global dictionary, that needs to be mapped to rdd, what is the proper way to do it?

    My example is very simple, but in reality my_dict and my_list are much larger, and my_func is more complicated.

  • Akavall
    Akavall almost 8 years
    I see!, So when I add my_dict_bc.value it works correctly. And broadcasting is a standard approach to work with objects that are being shared, right?
  • Alberto Bonsanto
    Alberto Bonsanto almost 8 years
    Yes, it is a good practice, however if the dictionary isn't too large, then you could use a global object without any problem
  • Akavall
    Akavall almost 8 years
    Makes sense. Thank You.
  • Denis Kuzin
    Denis Kuzin over 6 years
    @AlbertoBonsanto, may I ask you, how to work with a large dictionary?
  • Alberto Bonsanto
    Alberto Bonsanto over 6 years
    @DenisKuzin How large? If it's huge convert it to a DataFrame and use join.
  • Denis Kuzin
    Denis Kuzin over 6 years
    @AlbertoBonsanto, dictionary size is about 30Gb. Ok, thanks, good idea