from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.contrib.hooks.snowflake_hook import SnowflakeHook from datetime import datetime def create_bronze_car_sales_table_if_exists(): pg_hook = SnowflakeHook(snowflake_conn_id='snowflake') connection = pg_hook.get_conn() cursor = connection.cursor() try: cursor.execute(''' create table if not exists car_sales ( year varchar(50), make varchar(50), model varchar(50), trim varchar(50), body varchar(50), transmission varchar(50), vin varchar(50), state varchar(50), condition varchar(50), odometer varchar(50), color varchar(50), interior varchar(50), seller varchar(50), mmr varchar(50), sellingprice varchar(50), saledate varchar(50) ); ''') connection.commit() except Exception as e: print(e) finally: cursor.close() connection.close() def insert_data_in_car_sales(): pg_hook = SnowflakeHook(snowflake_conn_id='snowflake') connection = pg_hook.get_conn() cursor = connection.cursor() try: cursor.execute('''truncate table bronze_car_sales''') cursor.execute(''' INSERT INTO bronze_car_sales SELECT year, make, model, trim, body, transmission, vin, state, nullif(condition,'')::integer as condition, nullif(odometer, '')::integer as odometer, color, interior, seller, nullif(mmr, '')::integer as mmr, nullif(sellingprice, '')::integer as sellingprice, TO_DATE(ltrim(substr(saledate,4,12))::varchar, 'MON DD YYYY') as saledate FROM car_sales where length(saledate) = 39 ''') connection.commit() except Exception as e: print(e) finally: cursor.close() connection.close() # Define the DAG with DAG('pull_data_and_create_table_dag', start_date=datetime(2024, 3, 28), catchup=False, schedule_interval='@daily') as dag: # Define the task create_bronze_car_sales_table_if_exists = PythonOperator( task_id='create_bronze_car_sales_table_if_exists', python_callable=create_bronze_car_sales_table_if_exists ) # Define the task insert_data_in_car_sales = PythonOperator( task_id='insert_data_in_car_sales', python_callable=insert_data_in_car_sales ) # Set the task sequence create_bronze_car_sales_table_if_exists >> insert_data_in_car_sales