setting up airflow with bigquery operator
Solution 1
Took me a while to finally find it as it's not documented very clearly. In the airflow UI, go to Admin -> Connection. That connection id is what is being referenced by the parameters bigquery_connection_id. You must add in the "extras" field a json object that defines a k,v pair of "project" : ""
You must also add keys for "service_account" and "key_path" if you have not explicitly authorized an account on the box you're running Airflow. (gcloud auth)
Solution 2
If you need to do this programmatically, I use this as an entrypoint in our stack to create the connection if it doesn't already exist:
from airflow.models import Connection
from airflow.settings import Session
session = Session()
gcp_conn = Connection(
conn_id='bigquery',
conn_type='google_cloud_platform',
extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
Connection.conn_id == gcp_conn.conn_id).first():
session.add(gcp_conn)
session.commit()
Solution 3
Recently I fixed a similar problem by specifying both bigquery_conn_id
and google_cloud_storage_conn_id
like this:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default', <-- Need these both
google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance
delegate_to=False,
udf_config=False,
dag=dag,
)
See more in this answer: https://stackoverflow.com/a/45664830/634627
Jean-Christophe Rodrigue
Updated on September 29, 2022Comments
-
Jean-Christophe Rodrigue over 1 year
I am experimenting with airflow for data pipelines. I unfortunately cannot get it to work with the bigquery operator so far. I have searched for a solution to the best of my ability but I am still stuck.. I am using the sequential executor running locally.
Here is my code:
from airflow import DAG from airflow.contrib.operators.bigquery_operator import BigQueryOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG( dag_id='bigQueryPipeline', default_args=default_args, schedule_interval=timedelta(1) ) t1 = BigQueryOperator( task_id='bigquery_test', bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]', destination_dataset_table=False, bigquery_conn_id='bigquery_default', delegate_to=False, udf_config=False, dag=dag, )
The error message:
[2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project' Traceback (most recent call last): File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module> args.func(args) File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test ti.run(force=True, ignore_dependencies=True, test_mode=True) File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper result = func(*args, **kwargs) File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run result = task_copy.execute(context=context) File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute conn = hook.get_conn() File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn project = connection_extras['project']
-
Ted over 7 yearsThe BigQuery operator is broken in the current release, I have configured it with all the necessary "extras" and it is unable to connect. They should have this fixed in the next release, but I have no idea when that is coming out.
-
J.Fratzke over 7 yearsI'm running v1.7.1.3 and it works fine for me. I had some trouble as Google upgraded the oauth2 client to no longer include SignedJwtAssertionCredentials which I fixed by downgrading my oauth version. The new version switches to using the ServiceAccountCredentials.
-
Mike over 6 yearsWhere would this even go?
-
Mike over 6 yearsI got a few deprecation warnings when I used this solution... something about not using
**kwargs
-
J.Fratzke over 6 yearsThe format changed in airflow 1.8, I believe you don't need to specify extras anymore as there's a Google Cloud Platform connection type to select from the drop down list.