Airflow - got an unexpected keyword argument 'dag'
You do not need to parse in self into yor python_callable. Modify your insert_bigquery function's parameter as def insert_bigquery(ds, **kwargs) instead of def insert_bigquery(self).
Reference: https://airflow.apache.org/howto/operator/python.html
Felipe FB
Updated on June 07, 2022Comments
-
Felipe FB almost 2 years
I know there is already an airflow function that passes file from Cloud Storage to Big Query, as I did, I made the connection inside the script with the GCP the same way I would without the airflow, I called the PythonOperator to call the function that I configured in the script to read the Cloud Storage and insert the data from the file to Big Query, however I get the error message: "got an unexpected keyword argument 'dag'"
It seems to be a pretty simple thing to solve but I really do not know what that means since I put the DAG attributes inside the PythonOperator
import json import decimal import airflow from airflow import DAG from airflow.contrib.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from airflow.contrib.operators.bigquery_operator import BigQueryOperator from datetime import datetime, timedelta from airflow.operators.bash_operator import BashOperator from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.hooks.mssql_hook import MsSqlHook from tempfile import NamedTemporaryFile import pymssql import logging import os # import cloudstorage as gcs from google.cloud import bigquery from oauth2client.client import GoogleCredentials from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'depends_on_past': False, 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'depends_on_past': False, # If a task fails, retry it once after waiting # at least 5 minutes 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( dag_id='test_tab1', default_args=default_args, schedule_interval=timedelta(days=1), dagrun_timeout=timedelta(minutes=60) ) try: script_path = os.path.dirname(os.path.abspath(__file__)) + "/" except: script_path = "/usr/local/airflow/key/key.json" #Bigquery Credentials and settings os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path def insert_bigquery(self): bigquery_client = bigquery.Client(project="project-name") dataset_ref = bigquery_client.dataset('bucket-name') job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.skip_leading_rows = 1 job_config.source_format = bigquery.SourceFormat.CSV time_partitioning = bigquery.table.TimePartitioning() job_config.time_partitioning = time_partitioning job_config.clustering_fields = ["id"] #job_config.field_delimiter = ";" uri = "gs://bucket-name/"+filename load_job = bigquery_client.load_table_from_uri( uri, dataset_ref.table('tab1'), job_config=job_config ) print('Starting job {}'.format(load_job.job_id)) load_job.result() print('Job finished.') json_gcs_to_bq = PythonOperator( task_id='json_gcs_to_bq', python_callable=insert_bigquery, provide_context=True, dag=dag)
Error Message:
[2019-06-21 15:45:40,732] {{models.py:1760}} ERROR - insert_bigquery() got an unexpected keyword argument 'dag' Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute return_value = self.execute_callable() File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable return self.python_callable(*self.op_args, **self.op_kwargs) TypeError: insert_bigquery() got an unexpected keyword argument 'dag'
-
DJ319 almost 4 yearsWhat is ds? is it the execution date? Why does it need to be passed even if its not being used?