from typing import Any, Callable, Dict, Optional, Type, List from requests.auth import AuthBase, HTTPBasicAuth import logging import traceback from airflow.models import BaseOperator from airflow.providers.http.hooks.http import HttpHook from airflow.utils.operator_helpers import make_kwargs_callable from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook from openlineage.common.provider.bigquery import BigQueryDatasetsProvider, BigQueryErrorRunFacet from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata from openlineage.airflow.utils import get_job_name class HttpToBigQueryOperator(BaseOperator): """ Calls an HTTP endpoint and loads the response into a BigQuery table :param http_conn_id: The :ref:`http connection` to run the operator against :type http_conn_id: str :param endpoint: The relative part of the full url. (templated) :type endpoint: str :param method: The HTTP method to use, default = "POST" :type method: str :param data: The data to pass. POST-data in POST/PUT and params in the URL for a GET request. (templated) :type data: For POST/PUT, depends on the content-type parameter, for GET a dictionary of key/value string pairs :param headers: The HTTP headers to be added to the GET request :type headers: a dictionary of string key/value pairs :param response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function. :param extra_options: Extra options for the 'requests' library, see the 'requests' documentation (options to modify timeout, ssl, etc.) :type extra_options: A dictionary of options, where key is string and value depends on the option that's being modified. :param log_response: Log the response (default: False) :type log_response: bool :param auth_type: The auth type for the service :type auth_type: AuthBase of python requests lib :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. :type project_id: str :param dataset_id: The name of the dataset in which to find or create the table. :type dataset_id: str :param table_id: The name of the table to store the HTTP response body in. :type table_id: str :param gcp_conn_id: The connection ID used to connect to Google Cloud and interact with the BigQuery service. :type gcp_conn_id: str :param location: [Optional] The geographic location of the job. Required except for US and EU. See details at https://cloud.google.com/bigquery/docs/locations#specifying_your_location :type location: str """ template_fields = [ 'endpoint', 'data', 'headers', 'project_id', 'dataset_id', 'table_id', ] template_fields_renderers = {'headers': 'json', 'data': 'py'} template_ext = () ui_color = '#303378' def __init__( self, *, endpoint: Optional[str] = None, method: str = 'GET', data: Any = None, headers: Optional[Dict[str, str]] = None, response_filter: Optional[Callable[..., Any]] = None, extra_options: Optional[Dict[str, Any]] = None, http_conn_id: str = 'http_default', log_response: bool = False, auth_type: Type[AuthBase] = HTTPBasicAuth, project_id, dataset_id, table_id, gcp_conn_id='google_cloud_default', location=None, **kwargs: Any, ) -> None: super().__init__(**kwargs) self.http_conn_id = http_conn_id self.method = method self.endpoint = endpoint self.headers = headers or {} self.data = data or {} self.response_filter = response_filter self.extra_options = extra_options or {} self.log_response = log_response self.auth_type = auth_type self.project_id = project_id self.dataset_id = dataset_id self.table_id = table_id self.gcp_conn_id = gcp_conn_id self.location = location self.hook = None def execute(self, context: Dict[str, Any]) -> Any: self.log.info("Calling HTTP method") http = HttpHook(self.method, http_conn_id=self.http_conn_id, auth_type=self.auth_type) response = http.run(self.endpoint, self.data, self.headers, self.extra_options) if self.log_response: self.log.info(response.text) responseText = response.text if self.response_filter: kwargs_callable = make_kwargs_callable(self.response_filter) responseText = kwargs_callable(response.text, **context) if self.hook is None: self.log.info('Connecting to: %s', self.project_id) self.hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, use_legacy_sql=True, location=self.location, ) self.log.info('Validating table: %s', self.table_id) self.hook.run_table_upsert( dataset_id=self.dataset_id, project_id=self.project_id, table_resource={ "tableReference": { "projectId": self.project_id, "datasetId": self.dataset_id, "tableId": self.table_id }, "schema": { "fields": [ {"name": "timestamp", "type": "TIMESTAMP"}, {"name": "http_response", "type": "STRING"}, ] }, }, ) self.log.info('Insertng row into table %s', self.table_id,) # TODO: replace with insert_job() job_id = self.hook.run_query( sql=""" INSERT `{}.{}.{}` VALUES (CURRENT_TIMESTAMP(), @responseText) """.format(self.project_id, self.dataset_id, self.table_id), write_disposition='WRITE_APPEND', query_params=[{ 'name': 'responseText', 'parameterType': { 'type': 'STRING' }, 'parameterValue': { 'value': responseText } }], use_legacy_sql=False, ) context['task_instance'].xcom_push(key='job_id', value=job_id) def on_kill(self) -> None: super().on_kill() if self.hook is not None: self.log.info('Cancelling running query') self.hook.cancel_query() class HttpToBigQueryExtractor(BaseExtractor): def __init__(self, operator): super().__init__(operator) @classmethod def get_operator_classnames(cls) -> List[str]: return ['HttpToBigQueryOperator'] def extract(self) -> Optional[TaskMetadata]: return None def extract_on_complete(self, task_instance) -> Optional[TaskMetadata]: self.log.debug(f"extract_on_complete({task_instance})") try: bigquery_job_id = self._get_xcom_bigquery_job_id(task_instance) if bigquery_job_id is None: raise Exception("Xcom could not resolve BigQuery job id." + "Job may have failed.") except Exception as e: self.log.error(f"Cannot retrieve job details from BigQuery.Client. {e}", exc_info=True) return TaskMetadata( name=get_job_name(task=self.operator), run_facets={ "bigQuery_error": BigQueryErrorRunFacet( clientError=f"{e}: {traceback.format_exc()}" ) } ) stats = BigQueryDatasetsProvider().get_facets(bigquery_job_id) output = stats.output run_facets = stats.run_facets return TaskMetadata( name=get_job_name(task=self.operator), outputs=[output.to_openlineage_dataset()] if output else [], run_facets=run_facets ) def _get_xcom_bigquery_job_id(self, task_instance): bigquery_job_id = task_instance.xcom_pull( task_ids=task_instance.task_id, key='job_id') self.log.info(f"bigquery_job_id: {bigquery_job_id}") return bigquery_job_id