Can't connect to Mongo DB via Spark
Solution 1
Spark can't find the com.mongodb.spark.sql.DefaultSource
package, hence the error message.
Everything, else looks good just need to include the Mongo Spark package:
> $SPARK_HOME/bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
Or ensure that the jar file is on the correct path.
Make sure you check the version of the Mongo-Spark package required for your version of Spark: https://spark-packages.org/package/mongodb/mongo-spark
Solution 2
I am a pyspark user, here is what my code looks like, and it works:
MongoDB connection configuration in pyspark
# For spark version < 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.master('local')\
.config('spark.mongodb.input.uri', 'mongodb://user:[email protected]:27017/database01.data.coll')\
.config('spark.mongodb.output.uri', 'mongodb://user:[email protected]:27017/database01.data.coll')\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
.getOrCreate()
# For spark version >= 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.master('local')\
.config('spark.mongodb.input.uri', 'mongodb://user:[email protected]:27017/database01.coll')\
.config('spark.mongodb.output.uri', 'mongodb://user:[email protected]:27017/database01.coll')\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
.getOrCreate()
Read from MongoDB:
df01 = spark.read\
.format("com.mongodb.spark.sql.DefaultSource")\
.option("database","database01")\
.option("collection", "collection01")\
.load()
Write to MongoDB:
df01.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("overwrite")\
.option("database","database01")\
.option("collection", "collection02")\
.save()
Solution 3
I have had a quite hard time configuring the Spark connection to CosmosDB (API MongoDB), so I decided to post the code that worked for me as a contribution.
I used Spark 2.4.0 through a Databricks notebook.
from pyspark.sql import SparkSession
# Connect to CosmosDB to write on the collection
userName = "userName"
primaryKey = "myReadAndWritePrimaryKey"
host = "ipAddress"
port = "10255"
database = "dbName"
collection = "collectionName"
# Structure the connection
connectionString = "mongodb://{0}:{1}@{2}:{3}/{4}.{5}?ssl=true&replicaSet=globaldb".format(userName, primaryKey, host, port, database, collection)
spark = SparkSession\
.builder\
.config('spark.mongodb.input.uri', connectionString)\
.config('spark.mongodb.output.uri', connectionString)\
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
.getOrCreate()
# Reading from CosmosDB
df = spark.read\
.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri", connectionString)\
.option("database", database)\
.option("collection", collection)\
.load()
# Writing on CosmosDB (Appending new information without replacing documents)
dfToAppendOnCosmosDB.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("append")\
.option("uri", connectionString)\
.option("replaceDocument", False)\
.option("maxBatchSize", 100)\
.option("database", database)\
.option("collection", collection)\
.save()
I found the options to configure the connector at the link.
Ran P
Updated on June 04, 2022Comments
-
Ran P almost 2 years
I'm trying to read data from Mongo DB through an Apache Spark master.
I'm using 3 machines for this:
- M1 - with a Mongo DB instance on it
- M2 - with a Spark Master, with Mongo connector, running on it
- M3 - with a python application that connects to M2's Spark master
The application(M3) is getting a connection to the spark master like this:
_sparkSession = SparkSession.builder.master(masterPath).appName(appName)\ .config("spark.mongodb.input.uri", "mongodb://10.0.3.150/db1.data.coll")\ .config("spark.mongodb.output.uri", "mongodb://10.0.3.150/db1.data.coll").getOrCreate()
The application(M3) is trying to read data from the DB:
sqlContext = SQLContext(_sparkSession.sparkContext) df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://user:[email protected]/db1.data?readPreference=primaryPreferred").load()
but fails with this exception:
py4j.protocol.Py4JJavaError: An error occurred while calling o56.load. : java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:594) at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86) at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:579) ... 16 more
-
Ran P almost 7 yearsThank you for your answer. I specified that I ran the app through a remote Python application, and not by the PySpark shell. So, as a noob python developer, I ask again, how do I run my application with the connector package. Or do I need to run the spark master with the package?
-
Ross almost 7 yearsPlease update the question with more information on how you submit your spark jobs and I'll look to update my answer.
-
Ran P almost 7 yearsI changed the way I use the spark master. I initiate the Spark master and its slaves. After that I run the spark-submit with the mongo-spark-connector package and the python script. Guess that's the recommended way. Thanx all
-
Farhat Nawaz almost 6 years@Ross I have the same issue and can't seem to resolve it. any ideas?
-
Will almost 6 yearsWhen I ran into this, I didn't have the mongodb-spark-connector_2.11-2.2.3.jar in my $SPARK_HOME/jars (e.g. /usr/local/spark-2.2.2-bin-hadoop2.7/jars).
-
Gaboik1 about 5 yearsThis should be the accepted answer. The
spark.jars.packages
option is documented at spark.apache.org/docs/2.1.0/…