Testing Spark with pytest - cannot run Spark in local mode

10,323

I did some research and finally found the solution. I use Spark 1.6.

First of all I added two lines to my .bashrc file.

export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPA‌​TH

Then I created file "conftest.py". Filename is really important, you should not change it otherwise you will see error with spark_context. If you use Spark in local mode and do not use YARN, conftest.py should look like that:

import logging
import pytest

from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def quiet_py4j():
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)

@pytest.fixture(scope="session")
def spark_context(request):
    conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    quiet_py4j()
    return sc

@pytest.fixture(scope="session")
def hive_context(spark_context):
    return HiveContext(spark_context)

@pytest.fixture(scope="session")
def streaming_context(spark_context):
    return StreamingContext(spark_context, 1)

Now you can run tests by using simple pytest command. Pytest should run Spark and stopped it after all.

If you use YARN you can change conftest.py to:

    import logging
    import pytest

    from pyspark import HiveContext
    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext

    def quiet_py4j():
        """ turn down spark logging for the test context """
        logger = logging.getLogger('py4j')
        logger.setLevel(logging.WARN)

    @pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
    def spark_context(request):
        if request.param == 'local':
            conf = (SparkConf()
                    .setMaster("local[2]")
                    .setAppName("pytest-pyspark-local-testing")
                    )
        elif request.param == 'yarn':
            conf = (SparkConf()
                    .setMaster("yarn-client")
                    .setAppName("pytest-pyspark-yarn-testing")
                    .set("spark.executor.memory", "1g")
                    .set("spark.executor.instances", 2)
                    )
        request.addfinalizer(lambda: sc.stop())

        sc = SparkContext(conf=conf)
        return sc

    @pytest.fixture(scope="session")
    def hive_context(spark_context):
        return HiveContext(spark_context)

    @pytest.fixture(scope="session")
    def streaming_context(spark_context):
        return StreamingContext(spark_context, 1)

Now you can run tests in local mode by calling py.test -m spark_local and in YARN mode by calling py.test -m spark_yarn.

Wordcount example

In the same folder create three files: conftest.py (above), wordcount.py:

def do_word_counts(lines):
    counts = (lines.flatMap(lambda x: x.split())
                  .map(lambda x: (x, 1))
                  .reduceByKey(lambda x, y: x+y)
             ) 
    results = {word: count for word, count in counts.collect()}
    return results

And wordcount_test.py:

import pytest
import wordcount

pytestmark = pytest.mark.usefixtures("spark_context")

def test_do_word_counts(spark_context):
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

Now you can run tests by calling pytest.

Share:
10,323

Related videos on Youtube

Mrgr8m4
Author by

Mrgr8m4

Updated on September 15, 2022

Comments

  • Mrgr8m4
    Mrgr8m4 about 1 year

    I am trying to run wordcount test using pytest from this site - Unit testing Apache Spark with py.test. The problem is that I cannot start spark context. Code I use to run Spark Context:

    @pytest.fixture(scope="session")
    def spark_context(request):
        """ fixture for creating a spark context
        Args:
            request: pytest.FixtureRequest object
        """
        conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
        sc = SparkContext(conf=conf)
        request.addfinalizer(lambda: sc.stop())
    
        quiet_py4j()
        return sc
    

    I execute this code using command:

    #first way
    pytest spark_context_fixture.py
    
    #second way
    python spark_context_fixture.py
    

    The output:

    platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0
    rootdir: /home/mgr/test, inifile:
    collected 0 items
    

    Then I want to run wordcount test using pytest.

    pytestmark = pytest.mark.usefixtures("spark_context")
    
    def test_do_word_counts(spark_context):
        """ test word couting
        Args:
            spark_context: test fixture SparkContext
        """
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]
    
        input_rdd = spark_context.parallelize(test_input, 1)
        results = wordcount.do_word_counts(input_rdd)
    
        expected_results = {'hello':2, 'spark':3, 'again':1}  
        assert results == expected_results
    

    But the output is:

    ________ ERROR at setup of test_do_word_counts _________
    file /home/mgrabowski/test/wordcount_test.py, line 5
      def test_do_word_counts(spark_context):
    E       fixture 'spark_context' not found
    >       available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
    >       use 'pytest --fixtures [testpath]' for help on them.
    

    Does anyone know what is the reason of this issue?

  • AntiPawn79
    AntiPawn79 over 6 years
    This is fantastic. thanks. One question: No what if I have a larger project and I want to organize my spark tests in several folders; how do I now manage working with the conftest.py since it seems having these in the same folder where important.