pipeline_penguin.data_premise.sql
Main data_premise package, contains high-level data_nodes for SQL-based data premises (BigQuery, MySQL, etc).
This package provides DataPremise
constructors, which are pre-written validations to be executed
against a DataNode
. This package only convers SQL-based validations.
Location: pipeline_penguin/data_premise/sql
View Source
"""Main data_premise package, contains high-level data_nodes for SQL-based data premises (BigQuery, MySQL, etc). This package provides `DataPremise` constructors, which are pre-written validations to be executed against a `DataNode`. This package only convers SQL-based validations. Location: pipeline_penguin/data_premise/sql """ from .check_null import DataPremiseSQLCheckIsNull from .check_distinct import DataPremiseSQLCheckDistinct from .check_arithmetic import DataPremiseSQLCheckArithmeticOperationEqualsResult from .check_between import DataPremiseSQLCheckValuesAreBetween from .check_in import DataPremiseSQLCheckInArray from .check_like import DataPremiseSQLCheckLikePattern from .check_regexp import DataPremiseSQLCheckRegexpContains from .check_comparison import DataPremiseSQLCheckLogicalComparisonWithValue __all__ = [ "DataPremiseSQLCheckIsNull", "DataPremiseSQLCheckDistinct", "DataPremiseSQLCheckArithmeticOperationEqualsResult", "DataPremiseSQLCheckValuesAreBetween", "DataPremiseSQLCheckInArray", "DataPremiseSQLCheckLikePattern", "DataPremiseSQLCheckRegexpContains", "DataPremiseSQLCheckLogicalComparisonWithValue", ]
View Source
class DataPremiseSQLCheckIsNull(DataPremiseSQL): """This DataPremise is responsible for validating if a given column does not have null values. Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. """ def __init__(self, name: str, data_node: DataNode, column: str): super().__init__(name, data_node, column) self.query_template = "SELECT count(*) as total FROM `{project}.{dataset}.{table}` WHERE {column} is null" def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = data_frame["total"][0] passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if a given column does not have null values.
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise.
View Source
def __init__(self, name: str, data_node: DataNode, column: str): super().__init__(name, data_node, column) self.query_template = "SELECT count(*) as total FROM `{project}.{dataset}.{table}` WHERE {column} is null"
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = data_frame["total"][0] passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckDistinct(DataPremiseSQL): """This DataPremise is responsible for validating if all values of a column are distinct. Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. """ def __init__(self, name: str, data_node: DataNode, column: str): super().__init__(name, data_node, column) self.query_template = "SELECT count(DISTINCT {column}) distinct, count({column}) total as total FROM `{project}.{dataset}.{table}`" def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) passed = data_frame["result"][0] == data_frame["total"][0] failed_count = data_frame["total"][0] - data_frame["result"][0] output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if all values of a column are distinct.
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise.
View Source
def __init__(self, name: str, data_node: DataNode, column: str): super().__init__(name, data_node, column) self.query_template = "SELECT count(DISTINCT {column}) distinct, count({column}) total as total FROM `{project}.{dataset}.{table}`"
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) passed = data_frame["result"][0] == data_frame["total"][0] failed_count = data_frame["total"][0] - data_frame["result"][0] output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckArithmeticOperationEqualsResult(DataPremiseSQL): """This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40). Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The arithmetic operator (+, -, *, /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The arithmetic operator (+, -, *, /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["+", "-", "*", "/"] """ def __init__( self, name: str, data_node: DataNode, column: str, operator: str, second_term: Union[int, float], expected_result: Union[int, float], ): super().__init__(name, data_node, column) supported_operators = ["+", "-", "*", "/"] if operator not in supported_operators: raise WrongTypeReference( f"Operator not supported, supported operators: {supported_operators}" ) self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {second_term} = {expected_result}" self.operator = operator self.second_term = second_term self.expected_result = expected_result def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "operator": self.operator, "second_term": self.second_term, "expected_result": self.expected_result, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40).
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The arithmetic operator (+, -, , /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The arithmetic operator (+, -, *, /). second_term: Numeric value for the second term of the operation. expected_result: Expected numeric value for the result of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["+", "-", "", "/"]
View Source
def __init__( self, name: str, data_node: DataNode, column: str, operator: str, second_term: Union[int, float], expected_result: Union[int, float], ): super().__init__(name, data_node, column) supported_operators = ["+", "-", "*", "/"] if operator not in supported_operators: raise WrongTypeReference( f"Operator not supported, supported operators: {supported_operators}" ) self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {second_term} = {expected_result}" self.operator = operator self.second_term = second_term self.expected_result = expected_result
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "operator": self.operator, "second_term": self.second_term, "expected_result": self.expected_result, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckValuesAreBetween(DataPremiseSQL): """This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40). Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column """ def __init__( self, name: str, data_node: DataNode, column: str, lower_bound: str, upper_bound: str, ): self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} BETWEEN {lower_bound} AND {upper_bound}" self.lower_bound = lower_bound self.upper_bound = upper_bound super().__init__(name, data_node, column) def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "lower_bound": self.lower_bound, "upper_bound": self.upper_bound, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if a arithmetic operation involving the given column and an given term returns an expected restult (i.e. validate if column + 20 = 40).
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. lower_bound: Minimum allowed value for the column upper_bound: Maximum allowed value for the column
View Source
def __init__( self, name: str, data_node: DataNode, column: str, lower_bound: str, upper_bound: str, ): self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} BETWEEN {lower_bound} AND {upper_bound}" self.lower_bound = lower_bound self.upper_bound = upper_bound super().__init__(name, data_node, column)
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "lower_bound": self.lower_bound, "upper_bound": self.upper_bound, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckInArray(DataPremiseSQL): """This DataPremise is responsible for validating if the values of a column matches any value of a given array. It currently supports arrays of Strings, Booleans and Numeric values. Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. array: List of values to be matched against the column. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. array: List of values to be matched against the column. """ def __init__( self, name: str, data_node: DataNode, column: str, array: Union[str, list, float, bool], ): if type(array) == "list": array = str(array) self.query_template = "SELECT {column} as result result FROM `{project}.{dataset}.{table}` WHERE {column} IN UNNEST({array})" self.array = array super().__init__(name, data_node, column) def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "array": self.array, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if the values of a column matches any value of a given array. It currently supports arrays of Strings, Booleans and Numeric values.
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. array: List of values to be matched against the column. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. array: List of values to be matched against the column.
View Source
def __init__( self, name: str, data_node: DataNode, column: str, array: Union[str, list, float, bool], ): if type(array) == "list": array = str(array) self.query_template = "SELECT {column} as result result FROM `{project}.{dataset}.{table}` WHERE {column} IN UNNEST({array})" self.array = array super().__init__(name, data_node, column)
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "array": self.array, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckLikePattern(DataPremiseSQL): """This DataPremise is responsible for validating if the values of a column matches a given string. It supports a wildcard operator (%). Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard. """ def __init__( self, name: str, data_node: DataNode, column: str, pattern: str, ): self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} LIKE {pattern}" self.pattern = pattern super().__init__(name, data_node, column) def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "pattern": self.pattern, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if the values of a column matches a given string. It supports a wildcard operator (%).
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String to be matched against the column. Supports "%" caracter as a wildcard.
View Source
def __init__( self, name: str, data_node: DataNode, column: str, pattern: str, ): self.query_template = "SELECT {column} as result FROM `{project}.{dataset}.{table}` WHERE {column} LIKE {pattern}" self.pattern = pattern super().__init__(name, data_node, column)
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "pattern": self.pattern, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckRegexpContains(DataPremiseSQL): """This DataPremise is responsible for validating if the values of a column matches a given regexp pattern. Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp. """ def __init__( self, name: str, data_node: DataNode, column: str, pattern: str, ): self.query_template = 'SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE REGEXP_CONTAINS({column}, r"{pattern}")' self.pattern = pattern super().__init__(name, data_node, column) def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "pattern": self.pattern, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if the values of a column matches a given regexp pattern.
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. pattern: String with regex pattern to be matched against the column. Supports golang regexp.
View Source
def __init__( self, name: str, data_node: DataNode, column: str, pattern: str, ): self.query_template = 'SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE REGEXP_CONTAINS({column}, r"{pattern}")' self.pattern = pattern super().__init__(name, data_node, column)
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "pattern": self.pattern, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.
Inherited Members
View Source
class DataPremiseSQLCheckLogicalComparisonWithValue(DataPremiseSQL): """This DataPremise is responsible for validating if a logical operation between a column and a provided value is true. (i.e. validate if column >= 20). Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["<","<=","=", "=>","!=","<>"] """ def __init__( self, name: str, data_node: DataNode, column: str, operator: str, value: str, ): supported_operators = [ "<", "<=", "=", "=>", "!=", "<>", ] if operator not in supported_operators: raise WrongTypeReference( f"Operator not supported, supported operators: {supported_operators}" ) self.query_template = "SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {value}" self.operator = operator self.value = value super().__init__(name, data_node, column) def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "operator": self.operator, "value": self.value, } def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
This DataPremise is responsible for validating if a logical operation between a column and a provided value is true. (i.e. validate if column >= 20).
Args: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Attributes: name: Name for the data premise. data_node: Reference to the DataNode used in the validation. type: Type indicator of the premise. It is always "SQL". column: Column to be read by the premise. operator: The logical operator (<,<=,=,=>,!=,<>). value: Value for the second term of the operation. Raises: WrongTypeReference: If the "operator" argument is not a supported character ["<","<=","=", "=>","!=","<>"]
View Source
def __init__( self, name: str, data_node: DataNode, column: str, operator: str, value: str, ): supported_operators = [ "<", "<=", "=", "=>", "!=", "<>", ] if operator not in supported_operators: raise WrongTypeReference( f"Operator not supported, supported operators: {supported_operators}" ) self.query_template = "SELECT {column} result FROM `{project}.{dataset}.{table}` WHERE {column} {operator} {value}" self.operator = operator self.value = value super().__init__(name, data_node, column)
View Source
def query_args(self): """Method for returning the arguments to be passed on the query template of this validation. Returns: A `dictionary` with the query parameters. """ return { "project": self.data_node.project_id, "dataset": self.data_node.dataset_id, "table": self.data_node.table_id, "column": self.column, "operator": self.operator, "value": self.value, }
Method for returning the arguments to be passed on the query template of this validation.
Returns:
A dictionary
with the query parameters.
View Source
def validate(self) -> PremiseOutput: """Method for executing the validation over the DataNode. Returns: PremiseOutput: Object storeing the results for this validation. """ query = self.query_template.format(**self.query_args()) connector = self.data_node.get_connector(self.type) data_frame = connector.run(query) failed_count = len(data_frame["result"]) passed = failed_count == 0 output = PremiseOutput( self, self.data_node, self.column, passed, failed_count, data_frame ) return output
Method for executing the validation over the DataNode.
Returns: PremiseOutput: Object storeing the results for this validation.