Changing an ETL schedule to daily

This is a general guideline that you can follow to change an existing ETL schedule to daily. This should work for most of the scripts but maybe not all.
You can first check out the example diff provided and that should give the gist on what changes are necessary. If you need further explanation then read the explanation following that.

--

Part 1: Modifying the Spark Script (5 minutes)
Example diff

https://phabricator.noc.tvlk.cloud/D31805?vs=on&id=86882&whitespace=ignore-most

The goal is to allow the script's time granularity to actually be controlled by --time-granularity parameter by ensuring that time windows or any time related parameters are not hardcoded.

You generally want to modify the part of code that is hardcoded but actually should be relative to the time_granularity.
You can look for these parameters:

data = data.set_load_handler(
SparkS3AvroLoadHandler(
S3FileHook(
event=table_name,
file_ext='avro',
label='final',
version='v1',
bucket=bucket_name,
env=env,
protocol='s3a',
time_granularity="hour_1" # CHANGE THIS TO time_granularity
), sc, sample_ratio=1, n_partition=1, mode='overwrite',
dw_schema=schema
).with_time_window(
load_time_window
)
).load()

For example if the granularity is day_1 but the source data is available in HOUR duration, we need to specify the time window n value to be the number of hours in one day (24). To convert how many n time unit are there in another time unit, you can use helper function DatetimeHelper.convert_duration. For example:

time_window_unit, time_window_n = time_granularity.split('_')
time_window_n = int(time_window_n)

# convert the time_granularity into HOUR (because the source data duration is only available in HOUR)
time_window_n_in_hour = DatetimeHelper.convert_duration(
time_window_unit, time_window_n, Duration.HOUR)

extract_time_window = FixedTimeWindow(
time_window_n_in_hour, Duration.HOUR,
direction='forward'
).from_datetime(
dt
)

Part 2: Testing the (Now More Generic) Spark Script (15-30 minutes)

This is to ensure that that the --time-granularity parameter is indeed effective and correct. That is, simply specifying --time-granularity hour_6 does make the script produce six hourly data and specifying --time-granularity day_1 does make the script produce daily data, and so on.

spark-submit.sh --master yarn-cluster --jars /home/ubuntu/spark-jars/com.amazonaws_aws-java-sdk-1.7.4.jar,/home/ubuntu/spark-jars/org.apache.hadoop_hadoop-aws-2.7.1.jar,/home/ubuntu/spark-jars/com.databricks_spark-avro_2.10-2.0.1.jar edw_fact_flight_price_accuracy.py --dt "2016-06-01 00:00:00" --env dev --task_id track.flight.searchAccuracy_to_avro --event-name track.flight.searchAccuracy --time-granularity hour_1 --duration hour --table-name edw.fact_flight_price_accuracy

Part 3: Creating the New (Daily) DAG Definition (5 minutes)
Example diff

https://phabricator.noc.tvlk.cloud/D30404

This should be a simple copy+paste with some parameter modified.

Part 4: Removing the Load to Redshift Task in Original DAG (1 minute)

!!! Important: do this only after Part 1-3 has been completed, released, and stable.

Example Diff

https://phabricator.noc.tvlk.cloud/D30822

-

Post Release Checks

After completing the migration, don't forget to ensure that both the new daily DAG and the original (non-daily) DAG are successfully executed.
If not , find out why and ensure that it is not due to the migration (sometimes it failed for reasons not related to daily schedule).