setting up airflow with bigquery operator

12,796

Solution 1

Took me a while to finally find it as it's not documented very clearly. In the airflow UI, go to Admin -> Connection. That connection id is what is being referenced by the parameters bigquery_connection_id. You must add in the "extras" field a json object that defines a k,v pair of "project" : ""

You must also add keys for "service_account" and "key_path" if you have not explicitly authorized an account on the box you're running Airflow. (gcloud auth)

Solution 2

If you need to do this programmatically, I use this as an entrypoint in our stack to create the connection if it doesn't already exist:

from airflow.models import Connection
from airflow.settings import Session

session = Session()
gcp_conn = Connection(
    conn_id='bigquery',
    conn_type='google_cloud_platform',
    extra='{"extra__google_cloud_platform__project":"<YOUR PROJECT HERE>"}')
if not session.query(Connection).filter(
        Connection.conn_id == gcp_conn.conn_id).first():
    session.add(gcp_conn)
    session.commit()

Solution 3

Recently I fixed a similar problem by specifying both bigquery_conn_id and google_cloud_storage_conn_id like this:

t1 = BigQueryOperator(
  task_id='bigquery_test',
  bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
  destination_dataset_table=False,
  bigquery_conn_id='bigquery_default',             <-- Need these both
  google_cloud_storage_conn_id='bigquery_default', <-- becasue of inheritance 
  delegate_to=False,
  udf_config=False,
  dag=dag,
)

See more in this answer: https://stackoverflow.com/a/45664830/634627

Share:
12,796
Jean-Christophe Rodrigue
Author by

Jean-Christophe Rodrigue

Updated on September 29, 2022

Comments

  • Jean-Christophe Rodrigue
    Jean-Christophe Rodrigue over 1 year

    I am experimenting with airflow for data pipelines. I unfortunately cannot get it to work with the bigquery operator so far. I have searched for a solution to the best of my ability but I am still stuck.. I am using the sequential executor running locally.

    Here is my code:

    from airflow import DAG
    from airflow.contrib.operators.bigquery_operator import BigQueryOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG(
        dag_id='bigQueryPipeline', 
        default_args=default_args, 
        schedule_interval=timedelta(1)
    )
    
    t1 = BigQueryOperator(
        task_id='bigquery_test',
        bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
        destination_dataset_table=False,
        bigquery_conn_id='bigquery_default',
        delegate_to=False,
        udf_config=False,
        dag=dag,
    )
    

    The error message:

    [2016-08-27 00:13:14,665] {models.py:1327} ERROR - 'project'
    Traceback (most recent call last):
      File "/Users/jean.rodrigue/anaconda/bin/airflow", line 15, in <module>
        args.func(args)
      File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line 352, in test
        ti.run(force=True, ignore_dependencies=True, test_mode=True)
      File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
        result = func(*args, **kwargs)
      File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/models.py", line 1245, in run
        result = task_copy.execute(context=context)
      File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 57, in execute
        conn = hook.get_conn()
      File "/Users/jean.rodrigue/anaconda/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 54, in get_conn
        project = connection_extras['project']
    
  • Ted
    Ted over 7 years
    The BigQuery operator is broken in the current release, I have configured it with all the necessary "extras" and it is unable to connect. They should have this fixed in the next release, but I have no idea when that is coming out.
  • J.Fratzke
    J.Fratzke over 7 years
    I'm running v1.7.1.3 and it works fine for me. I had some trouble as Google upgraded the oauth2 client to no longer include SignedJwtAssertionCredentials which I fixed by downgrading my oauth version. The new version switches to using the ServiceAccountCredentials.
  • Mike
    Mike over 6 years
    Where would this even go?
  • Mike
    Mike over 6 years
    I got a few deprecation warnings when I used this solution... something about not using **kwargs
  • J.Fratzke
    J.Fratzke over 6 years
    The format changed in airflow 1.8, I believe you don't need to specify extras anymore as there's a Google Cloud Platform connection type to select from the drop down list.