pipeline_penguin.connector.sql
SQL connectors package, contains high-level SQL-source connectors.
This package stores high-level connector modules used for communication with the SQL data sources upon execution of the data premises.
Location: pipeline_penguin/connector/sql
View Source
"""SQL connectors package, contains high-level SQL-source connectors. This package stores high-level connector modules used for communication with the SQL data sources upon execution of the data premises. Location: pipeline_penguin/connector/sql """ __all__ = ["bigquery"]
View Source
"""Contains the `ConnectorSQLBigQuery` object responsible for interfacing the communication with the bigquery GCP service. The BigQuery connector uses the pandas GBQ library for running queries against a BigQuery database. It requires the provision of a service account key file in json format. Location: pipeline_penguin/connector/sql Example usage: ```python bq_connector = ConnectorSQLBigQuery(credentials_path="credentials.json") query_results = bq_connector.run("SELECT * FROM `my_project.my_dataset.my_table`", max_results=500) ``` """ from os import path import pandas as pd from google.oauth2.service_account import Credentials import google.auth from pipeline_penguin.core.data_node import NodeType from pipeline_penguin.core.connector.sql import ConnectorSQL class ConnectorSQLBigQuery(ConnectorSQL): """Object responsible for interfacing the communication with BigQuery data sources. Args: credentials_path: Path to a service account JSON file. max_results: Default maximum row count for the resulting pandas dataframe (default: 1000). Attributes: type: Base connector type, derived from the ConnectorSQL parent class (constant: "SQL"). source: Source type (constant: "BigQuery"). credentials_path: Path to a service account JSON file. max_results: Default maximum row count for the resulting pandas dataframe (default: 1000). Raises: FileNotFoundError: If the file located in the provided credentials_path is invalid or cannot be accessed. """ source = NodeType.BIG_QUERY def __init__(self, credentials_path: str = "default", max_results: int = 1000): super().__init__() print(path.isfile) if credentials_path == "default": print("Using google.auth.default credentials") self.credentials, self.project_id = google.auth.default() elif path.isfile(credentials_path): self.credentials = Credentials.from_service_account_file(credentials_path) self.project_id = None else: raise FileNotFoundError(f"{credentials_path} does not exist") self.max_results = max_results def run(self, query: str, max_results: int = None): """Method for executing a query and retrieving its results. Args: query: SQL code in BigQuery's standard format. Reference: https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax max_results: Max row count for the resulting pandas dataframe. Uses the default when not provided. Returns: A pandas `DataFrame` object with the results of the provided query up to the maximum number of rows allowed. """ # Using default max_results max_results = max_results if max_results else self.max_results df = pd.read_gbq( query=query, credentials=self.credentials, max_results=max_results, project_id=self.project_id, ) return df
Contains the ConnectorSQLBigQuery
object responsible for interfacing the communication with
the bigquery GCP service.
The BigQuery connector uses the pandas GBQ library for running queries against a BigQuery database. It requires the provision of a service account key file in json format.
Location: pipeline_penguin/connector/sql
Example usage:
bq_connector = ConnectorSQLBigQuery(credentials_path="credentials.json")
query_results = bq_connector.run("SELECT * FROM `my_project.my_dataset.my_table`", max_results=500)