Airflow - got an unexpected keyword argument 'dag'

10,842

You do not need to parse in self into yor python_callable. Modify your insert_bigquery function's parameter as def insert_bigquery(ds, **kwargs) instead of def insert_bigquery(self).

Reference: https://airflow.apache.org/howto/operator/python.html

Share:
10,842
Felipe FB
Author by

Felipe FB

Updated on June 07, 2022

Comments

  • Felipe FB
    Felipe FB almost 2 years

    I know there is already an airflow function that passes file from Cloud Storage to Big Query, as I did, I made the connection inside the script with the GCP the same way I would without the airflow, I called the PythonOperator to call the function that I configured in the script to read the Cloud Storage and insert the data from the file to Big Query, however I get the error message: "got an unexpected keyword argument 'dag'"

    It seems to be a pretty simple thing to solve but I really do not know what that means since I put the DAG attributes inside the PythonOperator

    import json
    import decimal
    import airflow
    from airflow import DAG
    from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
    from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
    from airflow.contrib.operators.bigquery_operator import BigQueryOperator
    from datetime import datetime, timedelta
    from airflow.operators.bash_operator import BashOperator
    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults
    from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
    from airflow.hooks.mssql_hook import MsSqlHook
    from tempfile import NamedTemporaryFile
    import pymssql  
    import logging
    import os
    # import cloudstorage as gcs
    from google.cloud import bigquery
    from oauth2client.client import GoogleCredentials
    from airflow.operators.python_operator import PythonOperator
    
    default_args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2),
        'depends_on_past': False,
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'depends_on_past': False,
        # If a task fails, retry it once after waiting
        # at least 5 minutes
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        dag_id='test_tab1',
        default_args=default_args,
        schedule_interval=timedelta(days=1),
        dagrun_timeout=timedelta(minutes=60)
    )
    
    try:
        script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
    except:
        script_path = "/usr/local/airflow/key/key.json"
    
    #Bigquery Credentials and settings
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path 
    
    def insert_bigquery(self):
        bigquery_client = bigquery.Client(project="project-name")
        dataset_ref = bigquery_client.dataset('bucket-name')
        job_config = bigquery.LoadJobConfig()
        job_config.autodetect = True
        job_config.skip_leading_rows = 1
        job_config.source_format = bigquery.SourceFormat.CSV
        time_partitioning = bigquery.table.TimePartitioning()
        job_config.time_partitioning = time_partitioning
        job_config.clustering_fields = ["id"]
        #job_config.field_delimiter = ";"
        uri = "gs://bucket-name/"+filename
        load_job = bigquery_client.load_table_from_uri(
            uri,
            dataset_ref.table('tab1'),
            job_config=job_config
            )
        print('Starting job {}'.format(load_job.job_id))
        load_job.result()
        print('Job finished.')
    
    
    json_gcs_to_bq = PythonOperator(
        task_id='json_gcs_to_bq',
        python_callable=insert_bigquery,
        provide_context=True,
        dag=dag)
    

    Error Message:

    [2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag'
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
        result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
        return_value = self.execute_callable()
      File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
    TypeError: insert_bigquery() got an unexpected keyword argument 'dag'
    
  • DJ319
    DJ319 almost 4 years
    What is ds? is it the execution date? Why does it need to be passed even if its not being used?