pipeline_penguin.data_premise.sql

Main data_premise package, contains high-level data_nodes for SQL-based data premises (BigQuery, MySQL, etc).

This package provides DataPremise constructors, which are pre-written validations to be executed against a DataNode. This package only convers SQL-based validations.

Location: pipeline_penguin/data_premise/sql

View Source
"""Main data_premise package, contains high-level data_nodes for SQL-based data premises (BigQuery,
MySQL, etc).

This package provides `DataPremise` constructors, which are pre-written validations to be executed
against a `DataNode`. This package only convers SQL-based validations.

Location: pipeline_penguin/data_premise/sql
"""
from .check_null import DataPremiseSQLCheckIsNull
from .check_distinct import DataPremiseSQLCheckDistinct
from .check_arithmetic import DataPremiseSQLCheckArithmeticOperationEqualsResult
from .check_between import DataPremiseSQLCheckValuesAreBetween
from .check_in import DataPremiseSQLCheckInArray
from .check_like import DataPremiseSQLCheckLikePattern
from .check_regexp import DataPremiseSQLCheckRegexpContains
from .check_comparison import DataPremiseSQLCheckLogicalComparisonWithValue

__all__ = [
    "DataPremiseSQLCheckIsNull",
    "DataPremiseSQLCheckDistinct",
    "DataPremiseSQLCheckArithmeticOperationEqualsResult",
    "DataPremiseSQLCheckValuesAreBetween",
    "DataPremiseSQLCheckInArray",
    "DataPremiseSQLCheckLikePattern",
    "DataPremiseSQLCheckRegexpContains",
    "DataPremiseSQLCheckLogicalComparisonWithValue",
]
View Source
class DataPremiseSQLCheckIsNull(DataPremiseSQL):
    """This DataPremise is responsible for validating if a given column does not have null values.

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
    """

    def __init__(self, name: str, data_node: DataNode, column: str):

        super().__init__(name, data_node, column)
        self.query_template = "SELECT count(*) as total FROM `{project}.{dataset}.{table}` WHERE {column} is null"

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = data_frame["total"][0]
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if a given column does not have null values.

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise.

#   DataPremiseSQLCheckIsNull( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str )
View Source
    def __init__(self, name: str, data_node: DataNode, column: str):

        super().__init__(name, data_node, column)
        self.query_template = "SELECT count(*) as total FROM `{project}.{dataset}.{table}` WHERE {column} is null"
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = data_frame["total"][0]
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckDistinct(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckDistinct(DataPremiseSQL):
    """This DataPremise is responsible for validating if all values of a column are distinct.

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
    """

    def __init__(self, name: str, data_node: DataNode, column: str):
        super().__init__(name, data_node, column)
        self.query_template = "SELECT count(DISTINCT {column}) distinct, count({column}) total as total FROM `{project}.{dataset}.{table}`"

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        passed = data_frame["result"][0] == data_frame["total"][0]
        failed_count = data_frame["total"][0] - data_frame["result"][0]

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if all values of a column are distinct.

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise.

#   DataPremiseSQLCheckDistinct( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str )
View Source
    def __init__(self, name: str, data_node: DataNode, column: str):
        super().__init__(name, data_node, column)
        self.query_template = "SELECT count(DISTINCT {column}) distinct, count({column}) total as total FROM `{project}.{dataset}.{table}`"
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        passed = data_frame["result"][0] == data_frame["total"][0]
        failed_count = data_frame["total"][0] - data_frame["result"][0]

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckArithmeticOperationEqualsResult(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckArithmeticOperationEqualsResult(DataPremiseSQL):
    """This DataPremise is responsible for validating if a arithmetic operation involving the given
    column and an given term returns an expected restult (i.e. validate if column + 20 = 40).

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        operator: The arithmetic operator (+, -, *, /).
        second_term: Numeric value for the second term of the operation.
        expected_result: Expected numeric value for the result of the operation.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        operator: The arithmetic operator (+, -, *, /).
        second_term: Numeric value for the second term of the operation.
        expected_result: Expected numeric value for the result of the operation.
    Raises:
        WrongTypeReference: If the "operator" argument is not a supported character ["+", "-", "*",
        "/"]
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        operator: str,
        second_term: Union[int, float],
        expected_result: Union[int, float],
    ):
        super().__init__(name, data_node, column)
        supported_operators = ["+", "-", "*", "/"]
        if operator not in supported_operators:
            raise WrongTypeReference(
                f"Operator not supported, supported operators: {supported_operators}"
            )
        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {second_term} = {expected_result}"
        self.operator = operator
        self.second_term = second_term
        self.expected_result = expected_result

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "operator": self.operator,
            "second_term": self.second_term,
            "expected_result": self.expected_result,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )

        return output

This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40).

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The arithmetic operator (+, -, , /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The arithmetic operator (+, -, *, /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["+", "-", "", "/"]

#   DataPremiseSQLCheckArithmeticOperationEqualsResult( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, operator: str, second_term: Union[int, float], expected_result: Union[int, float] )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        operator: str,
        second_term: Union[int, float],
        expected_result: Union[int, float],
    ):
        super().__init__(name, data_node, column)
        supported_operators = ["+", "-", "*", "/"]
        if operator not in supported_operators:
            raise WrongTypeReference(
                f"Operator not supported, supported operators: {supported_operators}"
            )
        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {second_term} = {expected_result}"
        self.operator = operator
        self.second_term = second_term
        self.expected_result = expected_result
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "operator": self.operator,
            "second_term": self.second_term,
            "expected_result": self.expected_result,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )

        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckValuesAreBetween(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckValuesAreBetween(DataPremiseSQL):
    """This DataPremise is responsible for validating if a arithmetic operation involving the given
    column and an given term returns an expected restult (i.e. validate if column + 20 = 40).

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        lower_bound: Minimum allowed value for the column
        upper_bound: Maximum allowed value for the column
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        lower_bound: Minimum allowed value for the column
        upper_bound: Maximum allowed value for the column
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        lower_bound: str,
        upper_bound: str,
    ):

        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE  {column} BETWEEN {lower_bound} AND {upper_bound}"
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "lower_bound": self.lower_bound,
            "upper_bound": self.upper_bound,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40).

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column

#   DataPremiseSQLCheckValuesAreBetween( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, lower_bound: str, upper_bound: str )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        lower_bound: str,
        upper_bound: str,
    ):

        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE  {column} BETWEEN {lower_bound} AND {upper_bound}"
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        super().__init__(name, data_node, column)
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "lower_bound": self.lower_bound,
            "upper_bound": self.upper_bound,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

View Source
class DataPremiseSQLCheckInArray(DataPremiseSQL):
    """This DataPremise is responsible for validating if the values of a column matches any value of
    a given array. It currently supports arrays of Strings, Booleans and Numeric values.

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        array: List of values to be matched against the column.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        array: List of values to be matched against the column.
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        array: Union[str, list, float, bool],
    ):
        if type(array) == "list":
            array = str(array)

        self.query_template = "SELECT {column} as result result FROM `{project}.{dataset}.{table}` WHERE {column} IN UNNEST({array})"
        self.array = array
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "array": self.array,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if the values of a column matches any value of a given array. It currently supports arrays of Strings, Booleans and Numeric values.

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. array: List of values to be matched against the column. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. array: List of values to be matched against the column.

#   DataPremiseSQLCheckInArray( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, array: Union[str, list, float, bool] )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        array: Union[str, list, float, bool],
    ):
        if type(array) == "list":
            array = str(array)

        self.query_template = "SELECT {column} as result result FROM `{project}.{dataset}.{table}` WHERE {column} IN UNNEST({array})"
        self.array = array
        super().__init__(name, data_node, column)
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "array": self.array,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckLikePattern(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckLikePattern(DataPremiseSQL):
    """This DataPremise is responsible for validating if the values of a column matches a given
    string. It supports a wildcard operator (%).

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        pattern: String to be matched against the column. Supports "%" caracter as a wildcard.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        pattern: String to be matched against the column. Supports "%" caracter as a wildcard.
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        pattern: str,
    ):

        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} LIKE {pattern}"
        self.pattern = pattern
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "pattern": self.pattern,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if the values of a column matches a given string. It supports a wildcard operator (%).

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard.

#   DataPremiseSQLCheckLikePattern( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, pattern: str )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        pattern: str,
    ):

        self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} LIKE {pattern}"
        self.pattern = pattern
        super().__init__(name, data_node, column)
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "pattern": self.pattern,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckRegexpContains(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckRegexpContains(DataPremiseSQL):
    """This DataPremise is responsible for validating if the values of a column matches a given
    regexp pattern.

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        pattern: String with regex pattern to be matched against the column. Supports golang regexp.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        pattern: String with regex pattern to be matched against the column. Supports golang regexp.
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        pattern: str,
    ):

        self.query_template = 'SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE REGEXP_CONTAINS({column}, r"{pattern}")'
        self.pattern = pattern
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "pattern": self.pattern,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if the values of a column matches a given regexp pattern.

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp.

#   DataPremiseSQLCheckRegexpContains( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, pattern: str )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        pattern: str,
    ):

        self.query_template = 'SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE REGEXP_CONTAINS({column}, r"{pattern}")'
        self.pattern = pattern
        super().__init__(name, data_node, column)
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "pattern": self.pattern,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.

#   class DataPremiseSQLCheckLogicalComparisonWithValue(pipeline_penguin.core.data_premise.sql.DataPremiseSQL):
View Source
class DataPremiseSQLCheckLogicalComparisonWithValue(DataPremiseSQL):
    """This DataPremise is responsible for validating if a logical operation between a column and a
    provided value is true. (i.e. validate if column >= 20).

    Args:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        column: Column to be read by the premise.
        operator: The logical operator (<,<=,=,=>,!=,<>).
        value: Value for the second term of the operation.
    Attributes:
        name: Name for the data premise.
        data_node: Reference to the DataNode used in the validation.
        type: Type indicator of the premise. It is always "SQL".
        column: Column to be read by the premise.
        operator: The logical operator (<,<=,=,=>,!=,<>).
        value: Value for the second term of the operation.
    Raises:
        WrongTypeReference: If the "operator" argument is not a supported character ["<","<=","=",
        "=>","!=","<>"]
    """

    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        operator: str,
        value: str,
    ):
        supported_operators = [
            "<",
            "<=",
            "=",
            "=>",
            "!=",
            "<>",
        ]
        if operator not in supported_operators:
            raise WrongTypeReference(
                f"Operator not supported, supported operators: {supported_operators}"
            )

        self.query_template = "SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {value}"
        self.operator = operator
        self.value = value
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "operator": self.operator,
            "value": self.value,
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

This DataPremise is responsible for validating if a logical operation between a column and a provided value is true. (i.e. validate if column >= 20).

Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["<","<=","=", "=>","!=","<>"]

#   DataPremiseSQLCheckLogicalComparisonWithValue( name: str, data_node: pipeline_penguin.core.data_node.data_node.DataNode, column: str, operator: str, value: str )
View Source
    def __init__(
        self,
        name: str,
        data_node: DataNode,
        column: str,
        operator: str,
        value: str,
    ):
        supported_operators = [
            "<",
            "<=",
            "=",
            "=>",
            "!=",
            "<>",
        ]
        if operator not in supported_operators:
            raise WrongTypeReference(
                f"Operator not supported, supported operators: {supported_operators}"
            )

        self.query_template = "SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {value}"
        self.operator = operator
        self.value = value
        super().__init__(name, data_node, column)
#   def query_args(self):
View Source
    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column,
            "operator": self.operator,
            "value": self.value,
        }

Method for returning the arguments to be passed on the query template of this validation.

Returns: A dictionary with the query parameters.

View Source
    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output

Method for executing the validation over the DataNode.

Returns: PremiseOutput: Object storeing the results for this validation.