# Databricks notebook source
from pyspark.sql.functions import *
from delta.tables import *

# COMMAND ----------

columns = ['BusinessEntityID',
 'PersonType',
 'FullName',
 'EmailAddress',
 'PhoneNumber',
 'EmailPromotion',
 'AdditionalContactInfo',
 'Demographics']

df1 = [[1,'EM','RahulYadav','rahul@gmail.com','9999-9999-90',1,'NA','NA']]
df2 = spark.createDataFrame(df1,columns)

# COMMAND ----------

if DeltaTable.isDeltaTable(spark, "dbfs:/mnt/dlzones/warehouse/openlineage_poc/person_details_dim"):
        person_details_tbl = DeltaTable.forPath(spark, 'dbfs:/mnt/dlzones/warehouse/openlineage_poc/person_details_dim')
        dfUpdates = df2
        
        person_details_tbl.alias('pdt') \
        .merge(dfUpdates.alias('updates'),'pdt.BusinessEntityID = updates.BusinessEntityID') \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    print("failed")

# COMMAND ----------