Testing Spark with pytest - cannot run Spark in local mode
I did some research and finally found the solution. I use Spark 1.6.
First of all I added two lines to my .bashrc file.
export SPARK_HOME=/usr/hdp/2.5.0.0-1245/spark
export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH
Then I created file "conftest.py". Filename is really important, you should not change it otherwise you will see error with spark_context. If you use Spark in local mode and do not use YARN, conftest.py should look like that:
import logging
import pytest
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def quiet_py4j():
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_context(request):
conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing"))
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
quiet_py4j()
return sc
@pytest.fixture(scope="session")
def hive_context(spark_context):
return HiveContext(spark_context)
@pytest.fixture(scope="session")
def streaming_context(spark_context):
return StreamingContext(spark_context, 1)
Now you can run tests by using simple pytest
command. Pytest should run Spark and stopped it after all.
If you use YARN you can change conftest.py to:
import logging
import pytest
from pyspark import HiveContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def quiet_py4j():
""" turn down spark logging for the test context """
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session",
params=[pytest.mark.spark_local('local'),
pytest.mark.spark_yarn('yarn')])
def spark_context(request):
if request.param == 'local':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
elif request.param == 'yarn':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
return sc
@pytest.fixture(scope="session")
def hive_context(spark_context):
return HiveContext(spark_context)
@pytest.fixture(scope="session")
def streaming_context(spark_context):
return StreamingContext(spark_context, 1)
Now you can run tests in local mode by calling py.test -m spark_local
and in YARN mode by calling py.test -m spark_yarn
.
Wordcount example
In the same folder create three files: conftest.py (above), wordcount.py:
def do_word_counts(lines):
counts = (lines.flatMap(lambda x: x.split())
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x+y)
)
results = {word: count for word, count in counts.collect()}
return results
And wordcount_test.py:
import pytest
import wordcount
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
Now you can run tests by calling pytest
.
Related videos on Youtube
Mrgr8m4
Updated on September 15, 2022Comments
-
Mrgr8m4 about 1 year
I am trying to run wordcount test using pytest from this site - Unit testing Apache Spark with py.test. The problem is that I cannot start spark context. Code I use to run Spark Context:
@pytest.fixture(scope="session") def spark_context(request): """ fixture for creating a spark context Args: request: pytest.FixtureRequest object """ conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) sc = SparkContext(conf=conf) request.addfinalizer(lambda: sc.stop()) quiet_py4j() return sc
I execute this code using command:
#first way pytest spark_context_fixture.py #second way python spark_context_fixture.py
The output:
platform linux2 -- Python 2.7.5, pytest-3.0.4, py-1.4.31, pluggy-0.4.0 rootdir: /home/mgr/test, inifile: collected 0 items
Then I want to run wordcount test using pytest.
pytestmark = pytest.mark.usefixtures("spark_context") def test_do_word_counts(spark_context): """ test word couting Args: spark_context: test fixture SparkContext """ test_input = [ ' hello spark ', ' hello again spark spark' ] input_rdd = spark_context.parallelize(test_input, 1) results = wordcount.do_word_counts(input_rdd) expected_results = {'hello':2, 'spark':3, 'again':1} assert results == expected_results
But the output is:
________ ERROR at setup of test_do_word_counts _________ file /home/mgrabowski/test/wordcount_test.py, line 5 def test_do_word_counts(spark_context): E fixture 'spark_context' not found > available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory > use 'pytest --fixtures [testpath]' for help on them.
Does anyone know what is the reason of this issue?
-
AntiPawn79 over 6 yearsThis is fantastic. thanks. One question: No what if I have a larger project and I want to organize my spark tests in several folders; how do I now manage working with the conftest.py since it seems having these in the same folder where important.