pipeline_penguin.data_node.sql

Main data_node package, contains high-level data_nodes for SQL-based data sources (BigQuery, MySQL, etc).

This package provides SQL-based DataNode constructors, which are data structures responsible for abstracting a single data source on an existing data pipeline.

Location: pipeline_penguin/data_node/sql/

View Source
"""Main data_node package, contains high-level data_nodes for SQL-based data sources (BigQuery,
MySQL, etc).

This package provides SQL-based `DataNode` constructors, which are data structures responsible for
abstracting a single data source on an existing data pipeline.

Location: pipeline_penguin/data_node/sql/
"""
__all__ = ["bigquery"]
View Source
"""Contains the `DataNodeBigQuery` constructor, which representings a given table or view from a
BigQuery's dataset.

The DataNode provides information needed for acessing the data on the GCP project and holds the
validations which will be executed against that data. It may also have a custom connector separated
from the Default connectors configured in the ConnectorManager.

Location: pipeline_penguin/data_node/sql

Example usage:

```python
data_node = DataNodeBigQuery(
    "name_test",
    "project_test",
    "dataset_test",
    "table_test",
    "account_test",
)

data_node.insert_premise(
    "check_nulls", DataPremiseSQLCheckIsNull, column="test_column"
)
result = data_node.run_premises()
```
"""

from pipeline_penguin.core.data_node import DataNode, NodeType
from pipeline_penguin.core.data_premise import PremiseType
from pipeline_penguin.connector.connector_manager import ConnectorManager
from pipeline_penguin.exceptions import WrongTypeReference
from pipeline_penguin.connector.sql.bigquery import ConnectorSQLBigQuery


class DataNodeBigQuery(DataNode):
    """Constructor for the DataNodeBigQuery, represents a single table or view from a BigQuery's
    dataset.

    Args:
        name: Name for this datanode.
        project_id: GCP project where the data is stored.
        dataset_id: BigQuery's dataset where the data is stored.
        table_id: BigQuery's table containing the data.
    Attributes:
        name: Name for this datanode.
        project_id: GCP project where the data is stored.
        dataset_id: BigQuery's dataset where the data is stored.
        table_id: BigQuery's table containing the data.
        premises: Dictionary holding every data_premise inserted
        supported_premise_types: Array of premise types allowed to be inserted on the data_node.
        source: Type of data source, it is always "BigQuery".
    """

    def __init__(self, name, project_id, dataset_id, table_id):
        """Initialize the constructor."""
        super().__init__(name, NodeType.BIG_QUERY)
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.supported_premise_types = [PremiseType.SQL]

    def get_connector(self, premise_type: str) -> ConnectorSQLBigQuery:
        """Method for retrieving the Connector to be used while querying data from
        this DataNode.

        Calls for a default connector if there's no Connector of the given type inside
        the **self.connectors** dictionary.

        Args:
            premise_type (str): Type of Premise for identifying the Connector.
        Raises:
            WrongTypeReference: If a Connector of the provied type was not found.
        Returns:
            The `Connector` instance retrieved.
        """
        key = f"{premise_type}{self.source}"

        if key in self.connectors:
            return self.connectors[key]

        connector = ConnectorManager().get_default(ConnectorSQLBigQuery)
        if connector is None:
            raise WrongTypeReference(
                f"Could not find {key} as a custom or default connector"
            )

        return connector

    def to_serializeble_dict(self) -> dict:
        """Returns a dictionary representation of the current DataNode using only built-in data
        types.

        Returns:
            dict -> Dicionary containing attributes of this DataNode.
        """
        result = {
            "name": self.name,
            "source": self.source,
            "supported_premise_types": self.supported_premise_types,
            "project_id": self.project_id,
            "dataset_id": self.dataset_id,
            "table_id": self.table_id,
        }

        return result

Contains the DataNodeBigQuery constructor, which representings a given table or view from a BigQuery's dataset.

The DataNode provides information needed for acessing the data on the GCP project and holds the validations which will be executed against that data. It may also have a custom connector separated from the Default connectors configured in the ConnectorManager.

Location: pipeline_penguin/data_node/sql

Example usage:

data_node = DataNodeBigQuery(
    "name_test",
    "project_test",
    "dataset_test",
    "table_test",
    "account_test",
)

data_node.insert_premise(
    "check_nulls", DataPremiseSQLCheckIsNull, column="test_column"
)
result = data_node.run_premises()