Custom Deequ Results: Correct/Incorrect Counts
Hey guys! So, you're diving into the world of data quality with Deequ and want to get super specific about the results you're seeing? Awesome! You want to customize the return result prompt and, more importantly, get the correct and incorrect counts for each rule you're checking. This is totally doable, and I'm here to walk you through it with some examples. Let's break it down.
Understanding Deequ's Check Results
Before we get into the nitty-gritty of customization, let's quickly recap how Deequ usually reports its findings. By default, Deequ runs a series of checks against your data and gives you a summary. This summary tells you whether each check passed or failed. While this is helpful, it doesn't give you the granular detail of how many records passed or failed a specific rule. That's where custom result sets come into play. You need to understand how to interpret the CheckResult object and how to access the underlying metrics.
When Deequ runs a check, it generates a CheckResult object. This object contains a list of CheckResult.CheckResultDetail objects, each representing the outcome of a specific constraint. Each CheckResultDetail includes a status (success or failure), a message describing the outcome, and optionally, a metric that provides quantitative information about the check. To get the correct and incorrect counts, you need to delve into these metrics.
The key here is that not all constraints automatically produce metrics that directly give you these counts. For example, a isComplete check simply verifies that a column has no null values and returns a boolean. However, you can create constraints that do produce metrics you can use to derive these counts. This often involves using where clauses to filter data based on whether it satisfies a certain condition, and then using aggregations to count the number of records that match the condition.
Implementing Custom Return Results
Okay, let's get practical. To define your custom return result prompt and get those sweet correct/incorrect counts, you'll need to craft your Deequ checks with a bit more detail. Here’s a step-by-step approach:
- Define Your Checks with Metrics: Instead of just using standard checks, you’ll want to define checks that produce metrics that give you the counts you need. This often means using
selectto perform aggregations and calculations within your checks. - Run the Checks: Execute your Deequ checks against your data.
- Extract the Results: After the checks are run, you'll extract the
CheckResultand parse the metrics to get the correct and incorrect counts. - Format Your Output: Finally, format the output to display the results in a way that's easy to understand.
Example Scenario
Let’s say you have a dataset of customer orders, and you want to ensure that the order_amount is always greater than zero. You also want to know how many orders violate this rule.
Here's how you can do it:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
from deequ.checks import Check, CheckLevel
from deequ.verification import VerificationSuite, VerificationResult
# Initialize Spark
spark = SparkSession.builder.appName("DeequCustomResults").getOrCreate()
# Sample Data
data = [
(1, "Alice", 100.0),
(2, "Bob", 50.0),
(3, "Charlie", -10.0),
(4, "David", 200.0),
(5, "Eve", -5.0)
]
df = spark.createDataFrame(data, ["order_id", "customer_name", "order_amount"])
# Define the Check
check = Check(CheckLevel.Warning, "Order Amount Check")
check = check.isNonNegative("order_amount", assertion=True)
# Verification Suite
verification_result = VerificationSuite(spark)
.onData(df)
.addCheck(check)
.run()
# Process the Verification Result
if verification_result.status == VerificationResult.Success:
print("\nVerification was successful")
else:
print("\nVerification failed!")
for result in verification_result.checkResults:
if result.status != "Success":
print(f"Check ‘{result.check}’ failed with details:")
for constraint_result in result.constraintResults:
print(f"- {constraint_result.constraint}: {constraint_result.message}")
# Custom Metric to Count Valid and Invalid Orders
valid_orders_count = df.filter(col("order_amount") > 0).count()
invalid_orders_count = df.filter(col("order_amount") <= 0).count()
print(f"\nNumber of Valid Orders: {valid_orders_count}")
print(f"Number of Invalid Orders: {invalid_orders_count}")
spark.stop()
In this example, instead of relying solely on Deequ's built-in checks, we also compute custom metrics (valid_orders_count and invalid_orders_count) by filtering the DataFrame and counting the rows that satisfy and violate our condition. This gives us the exact counts we need.
Diving Deeper: Custom Metrics within Deequ
While the above example uses Spark's filtering and counting, you can also integrate custom metric calculations directly into Deequ checks using Analyzer and Check functionalities. This approach is more integrated and can be part of a larger Deequ verification suite.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
from deequ.checks import Check, CheckLevel
from deequ.verification import VerificationSuite
from deequ.analyzers import AggregateAnalyzer, Size
from deequ.metric import VerificationMetric
# Initialize Spark
spark = SparkSession.builder.appName("DeequCustomMetrics").getOrCreate()
# Sample Data
data = [
(1, "Alice", 100.0),
(2, "Bob", 50.0),
(3, "Charlie", -10.0),
(4, "David", 200.0),
(5, "Eve", -5.0)
]
df = spark.createDataFrame(data, ["order_id", "customer_name", "order_amount"])
# Define Custom Metric Analyzers
valid_orders_analyzer = AggregateAnalyzer(
spark_sum(col("case when order_amount > 0 then 1 else 0 end")).alias("valid_orders_count"),
where="order_amount > 0"
)
invalid_orders_analyzer = AggregateAnalyzer(
spark_sum(col("case when order_amount <= 0 then 1 else 0 end")).alias("invalid_orders_count"),
where="order_amount <= 0"
)
# Verification Suite
verification_result = VerificationSuite(spark)
.onData(df)
.addAnalyzer(valid_orders_analyzer)
.addAnalyzer(invalid_orders_analyzer)
.run()
# Extract and Print Results
metrics = verification_result.analyzerContext.metricMap
valid_orders_metric = metrics[valid_orders_analyzer.instance] # Use the instance to get the metric
invalid_orders_metric = metrics[invalid_orders_analyzer.instance]
valid_orders_count = valid_orders_metric.value[0]
invalid_orders_count = invalid_orders_metric.value[0]
print(f"Number of Valid Orders: {valid_orders_count}")
print(f"Number of Invalid Orders: {invalid_orders_count}")
spark.stop()
Explanation
- Custom Analyzers: We define two
AggregateAnalyzerinstances: one to count valid orders and another to count invalid orders. Thewhereclause ensures we only count records that meet our criteria. - Verification Suite: We add these analyzers to the
VerificationSuite. - Extract Metrics: After running the verification, we extract the metrics from the
analyzerContext. - Print Results: We then print the counts of valid and invalid orders.
Advanced Customization and Reporting
For more advanced customization, you might want to create a custom reporting function that takes the CheckResult and the custom metrics you've calculated and formats them into a human-readable report. This could involve generating HTML reports, sending notifications, or storing the results in a database.
def custom_report(verification_result, valid_count, invalid_count):
report = {
"status": verification_result.status,
"checks": []
}
for result in verification_result.checkResults:
check_details = {
"check": result.check,
"status": result.status,
"constraint_results": [{
"constraint": cr.constraint,
"status": cr.status,
"message": cr.message
} for cr in result.constraintResults]
}
report["checks"].append(check_details)
report["custom_metrics"] = {
"valid_orders": valid_count,
"invalid_orders": invalid_count
}
return report
# Usage
report = custom_report(verification_result, valid_orders_count, invalid_orders_count)
import json
print(json.dumps(report, indent=4))
Key Takeaways
- Use Custom Metrics: Calculate valid and invalid counts using Spark's aggregation functions or Deequ's
AggregateAnalyzer. - Extract Results Carefully: Navigate the
CheckResultandanalyzerContextto access the metrics you need. - Format for Readability: Create custom reporting functions to present the results in a clear and actionable format.
By combining Deequ's powerful data quality checks with custom metric calculations and reporting, you can get a highly detailed and customized view of your data quality. This level of detail allows you to pinpoint specific issues, track progress over time, and ensure that your data meets the standards your organization requires. So, go forth and customize! Happy data wrangling!