This is a general guideline that you can follow to change an existing ETL schedule to daily. This should work for most of the scripts but maybe not all.
You can first check out the example diff provided and that should give the gist on what changes are necessary. If you need further explanation then read the explanation following that.
--
https://phabricator.noc.tvlk.cloud/D31805?vs=on&id=86882&whitespace=ignore-most
The goal is to allow the script's time granularity to actually be controlled by --time-granularity
parameter by ensuring that time windows or any time related parameters are not hardcoded.
You generally want to modify the part of code that is hardcoded but actually should be relative to the time_granularity
.
You can look for these parameters:
time_granularity
is supplied to S3FileHook when loading. In some scripts you'll find that the value is hardcoded, in which case change it to time_granularity
. For example:
data = data.set_load_handler(
SparkS3AvroLoadHandler(
S3FileHook(
event=table_name,
file_ext='avro',
label='final',
version='v1',
bucket=bucket_name,
env=env,
protocol='s3a',
time_granularity="hour_1" # CHANGE THIS TO time_granularity
), sc, sample_ratio=1, n_partition=1, mode='overwrite',
dw_schema=schema
).with_time_window(
load_time_window
)
).load()
time_granularity
value is (hour_1
, hour_6
, day_1
, ...).
For example if the granularity is day_1
but the source data is available in HOUR duration, we need to specify the time window n
value to be the number of hours in one day (24). To convert how many n
time unit are there in another time unit, you can use helper function DatetimeHelper.convert_duration
. For example:
time_window_unit, time_window_n = time_granularity.split('_')
time_window_n = int(time_window_n)
# convert the time_granularity into HOUR (because the source data duration is only available in HOUR)
time_window_n_in_hour = DatetimeHelper.convert_duration(
time_window_unit, time_window_n, Duration.HOUR)
extract_time_window = FixedTimeWindow(
time_window_n_in_hour, Duration.HOUR,
direction='forward'
).from_datetime(
dt
)
This is to ensure that that the --time-granularity
parameter is indeed effective and correct. That is, simply specifying --time-granularity hour_6
does make the script produce six hourly data and specifying --time-granularity day_1
does make the script produce daily data, and so on.
data-airiflow-worker-01
. Go to /data/
to_daily
. Paste your modified script there.
spark-submit.sh
command both with the original script and the modified script. You can find a sample command in the Airflow log of a to_avro task. For example:
spark-submit.sh --master yarn-cluster --jars /home/ubuntu/spark-jars/com.amazonaws_aws-java-sdk-1.7.4.jar,/home/ubuntu/spark-jars/org.apache.hadoop_hadoop-aws-2.7.1.jar,/home/ubuntu/spark-jars/com.databricks_spark-avro_2.10-2.0.1.jar edw_fact_flight_price_accuracy.py --dt "2016-06-01 00:00:00" --env dev --task_id track.flight.searchAccuracy_to_avro --event-name track.flight.searchAccuracy --time-granularity hour_1 --duration hour --table-name edw.fact_flight_price_accuracy
--time-granularity
to day_1.
https://phabricator.noc.tvlk.cloud/D30404
This should be a simple copy+paste with some parameter modified.
_daily
at the end of it too.
start_date
to a recent date (no more than 3 days before expected release).
0 17 * * *.
--time-granularity
spark submit parameter to day_1
.
time_granularity
to day_1
.
!!! Important: do this only after Part 1-3 has been completed, released, and stable.
https://phabricator.noc.tvlk.cloud/D30822
load.set_upstream(transform)
).
-
After completing the migration, don't forget to ensure that both the new daily DAG and the original (non-daily) DAG are successfully executed.
If not , find out why and ensure that it is not due to the migration (sometimes it failed for reasons not related to daily schedule).