Stream files to kafka using airflow

11,028

Probably best to use the PythonOperator to process the files line-by-line. I have a use case where I poll and SFTP server for files and when I find some, I process them line-by-line, writing the results out as JSON. I do things like parse dates into a YYYY-MM-DD format, etc. Something like this might work for you:

def csv_file_to_kafka(**context):

    f = '/path/to/downloaded/csv_file.csv'
    csvfile = open(f, 'r')
    reader = csv.DictReader(csvfile)

    for row in reader:
        """
        Send the row to Kafka
        """
    return 

csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

Now it's really up to you how you would get the files to download. In my case, I use the SSHHook and GoogleCloudStorageHook to get files from an SFTP Server and then pass the names of the files to an task that parses and cleans the csv files. I do this by pulling the files down from SFTP and putting them into Google Cloud Storage:

"""
HOOKS: Connections to external systems
"""
def sftp_connection():
    """
    Returns an SFTP connection created using the SSHHook
    """
    ssh_hook = SSHHook(ssh_conn_id='sftp_connection')
    ssh_client = ssh_hook.get_conn()
    return ssh_client.open_sftp()
def gcs_connection():
    """
    Returns an GCP connection created using the GoogleCloudStorageHook
    """
    return GoogleCloudStorageHook(google_cloud_storage_conn_id='my_gcs_connection')

"""
PYTHON CALLABLES: Called by PythonOperators
"""
def get_files(**context):
    """
    Looks at all files on the FTP server and returns a list files.
    """
    sftp_client = sftp_connection()
    all_files = sftp_client.listdir('/path/to/files/')
    files = []

    for f in all_files:
        files.append(f)

    return files

def save_files(**context):
    """
    Looks to see if a file already exists in GCS. If not, the file is downloaed
    from SFTP server and uploaded to GCS. A list of
    """
    files = context['task_instance'].xcom_pull(task_ids='get_files')

    sftp_client = sftp_connection()
    gcs = gcs_connection()
    new_files = []
    new_outcomes_files = []
    new_si_files = []

    new_files = process_sftp_files(files, gcs, sftp_client)

    return new_files

def csv_file_to_kafka(**context):
    """
    Untested sample parse csv files and send to kafka
    """
    files = context['task_instance'].xcom_pull(task_ids='save_files')
    for f in new_files:
        csvfile = open(f, 'r')
        reader = csv.DictReader(csvfile)

        for row in reader:
            """
            Send the row to Kafka
            """
    return 

get_files = PythonOperator(
   task_id='get_files',
   python_callable=get_files,
   dag=dag
)
save_files = PythonOperator(
   task_id='save_files',
   python_callable=save_files,
   dag=dag
)
csv_file_to_kafka = PythonOperator(
   task_id='csv_file_to_kafka',
   python_callable=csv_file_to_kafka,
   dag=dag
)

I know I could do this all in one big python callable, that's how I'm refactoring the code now so that in the callable. So it polls the SFTP server, pulls the latest files, and parses them according to my rules all in one single python function. I have heard that using XCom isn't ideal, Airflow tasks aren't supposed to communicate with each other too much, supposedly.

Depending on your use case, you might even want to explore something like Apache Nifi, I'm actually looking into that now too.

Share:
11,028

Related videos on Youtube

bsd
Author by

bsd

Updated on June 04, 2022

Comments

  • bsd
    bsd almost 2 years

    What is the best approach to stream CSV files to a kafka topic using airflow ?

    Writing a custom Operator for airflow ?

    • Mike
      Mike over 6 years
      are you really streaming the files or are you batching them? Airflow really supports batching/micro-batching well but for streaming, my experience shows it's not so great, basically works like nano-batching. I do a lot of polling for CSV files on remote hosts and pull them into BigQuery as batches.