import uuid import copy from openlineage.client.client import OpenLineageClient from openlineage.client import facet, run from openlineage.client.run import ( SCHEMA_URL, Dataset, DatasetEvent, Job, JobEvent, Run, RunEvent, RunState, ) from openlineage.client.facet import ( ColumnLineageDatasetFacet, ColumnLineageDatasetFacetFieldsAdditional, ColumnLineageDatasetFacetFieldsAdditionalInputFields, DatasetVersionDatasetFacet, LifecycleStateChange, LifecycleStateChangeDatasetFacet, LifecycleStateChangeDatasetFacetPreviousIdentifier, OwnershipDatasetFacet, OwnershipDatasetFacetOwners, OwnershipJobFacet, OwnershipJobFacetOwners, StorageDatasetFacet, SymlinksDatasetFacet, SymlinksDatasetFacetIdentifiers, ) inputs = [ { "namespace": "openlineage8", "name": "dataset_a", "facets": { "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python", "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "string" }, { "name": "b", "type": "string" }, ] } } } ] transform_outputs = [ { "namespace": "openlineage8", "name": "dataset_b", "facets": { "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python", "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "string" }, { "name": "b", "type": "string" }, { "name": "c", "type": "string" } ] } } } ] final_outputs = [ { "namespace": "openlineage8", "name": "dataset_c", "facets": { "schema": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python", "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SchemaDatasetFacet", "fields": [ { "name": "a", "type": "string" }, { "name": "b", "type": "string" }, { "name": "c", "type": "string" }, { "name": "d", "type": "string" }, ] } } } ] client = OpenLineageClient("http://localhost:5000") run = Run(runId=str(uuid.uuid4())) event_read_start = RunEvent( eventType=RunState.START, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=inputs, outputs=[], producer="", schemaURL="" ) event_read_complete = RunEvent( eventType=RunState.COMPLETE, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=inputs, outputs=[], producer="", schemaURL="" ) event_transform_start = RunEvent( eventType=RunState.START, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=inputs, outputs=transform_outputs, producer="", schemaURL="" ) event_transform_complete = RunEvent( eventType=RunState.COMPLETE, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=inputs, outputs=transform_outputs, producer="", schemaURL="" ) event_final_start = RunEvent( eventType=RunState.START, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=transform_outputs, outputs=final_outputs, producer="", schemaURL="" ) event_final_complete = RunEvent( eventType=RunState.COMPLETE, eventTime="2021-11-03T10:53:52.427343", run=run, job=Job(name="job", namespace="openlineage8"), inputs=transform_outputs, outputs=final_outputs, producer="", schemaURL="" ) client.emit(event_read_start) client.emit(event_read_complete) client.emit(event_transform_start) client.emit(event_transform_complete) client.emit(event_final_start) client.emit(event_final_complete) client.emit( RunEvent( RunState.START, "2021-11-03T10:53:52.427343", run, Job( "openlineage8", "job", ), "some-producer", [], [ Dataset( namespace="openlineage8", name="dataset_a", facets={ "columnLineage": ColumnLineageDatasetFacet( { "output-field": ColumnLineageDatasetFacetFieldsAdditional( transformationDescription="some-transformation", transformationType="some-transformation-type", inputFields=[ ColumnLineageDatasetFacetFieldsAdditionalInputFields( namespace="openlineage8", name="dataset_a", field="a", ), ], ), }, ), }, ), ], ), ) client.emit( RunEvent( RunState.COMPLETE, "2021-11-03T10:53:52.427343", run, Job( "openlineage8", "job", ), "some-producer", [], [ Dataset( namespace="openlineage8", name="dataset_a", facets={ "columnLineage": ColumnLineageDatasetFacet( { "output-field": ColumnLineageDatasetFacetFieldsAdditional( transformationDescription="some-transformation", transformationType="some-transformation-type", inputFields=[ ColumnLineageDatasetFacetFieldsAdditionalInputFields( namespace="openlineage8", name="dataset_a", field="a", ), ], ), }, ), }, ), ], ), )