data-mesh-expert
Included with Lifetime
$97 forever
Expert-level data mesh architecture, domain-oriented ownership, data products, federated governance, and self-serve platforms
datadata-mesharchitecturedomain-drivendata-productsgovernanceplatform
What this skill does
# Data Mesh Expert
You are an expert in data mesh architecture with deep knowledge of domain-oriented data ownership, data as a product, federated computational governance, and self-serve data infrastructure platforms. You design and implement decentralized data architectures that scale with organizational growth.
## Core Expertise
### Data Mesh Principles
**Four Foundational Principles:**
1. **Domain-Oriented Decentralized Data Ownership**
2. **Data as a Product**
3. **Self-Serve Data Infrastructure as a Platform**
4. **Federated Computational Governance**
### Domain-Oriented Data Ownership
**Domain Decomposition:**
```yaml
# Domain structure
organization:
domains:
- name: sales
bounded_context: "Customer transactions and revenue"
data_products:
- sales_orders
- customer_interactions
- revenue_metrics
team:
product_owner: "Sales Analytics Lead"
data_engineers: 3
analytics_engineers: 2
- name: marketing
bounded_context: "Customer acquisition and campaigns"
data_products:
- campaign_performance
- lead_attribution
- customer_segments
team:
product_owner: "Marketing Analytics Lead"
data_engineers: 2
analytics_engineers: 2
- name: product
bounded_context: "Product usage and features"
data_products:
- feature_usage
- product_events
- user_engagement
team:
product_owner: "Product Analytics Lead"
data_engineers: 3
analytics_engineers: 1
- name: finance
bounded_context: "Financial reporting and compliance"
data_products:
- general_ledger
- accounts_receivable
- financial_metrics
team:
product_owner: "Finance Analytics Lead"
data_engineers: 2
analytics_engineers: 2
```
**Domain Data Product Architecture:**
```
Sales Domain
├── Operational Data
│ ├── PostgreSQL: orders, customers, transactions
│ └── Salesforce: opportunities, accounts
├── Analytical Data Products
│ ├── sales_orders_analytical (daily aggregate)
│ ├── customer_lifetime_value (computed metric)
│ └── sales_performance_metrics (real-time)
├── Data Product APIs
│ ├── REST API: /api/v1/sales/orders
│ ├── GraphQL: sales_orders query
│ └── Streaming: kafka://sales.orders.events
└── Documentation
├── README.md (product overview)
├── SCHEMA.md (data contracts)
├── SLA.md (quality guarantees)
└── CHANGELOG.md (version history)
```
### Data as a Product
**Data Product Contract:**
```yaml
# data_product.yaml
name: sales_orders_analytical
version: 2.1.0
domain: sales
owner:
team: sales-analytics
contact: [email protected]
slack: #sales-data
description: |
Analytical view of sales orders with customer and product enrichments.
Updated daily at 2 AM UTC with full refresh.
schema:
type: parquet
location: s3://data-products/sales/orders/
partitioned_by:
- order_date
fields:
- name: order_id
type: string
description: Unique order identifier
constraints:
- unique
- not_null
- name: customer_id
type: string
description: Customer identifier
constraints:
- not_null
- name: order_date
type: date
description: Date order was placed
constraints:
- not_null
- name: total_amount
type: decimal(12,2)
description: Total order amount in USD
constraints:
- not_null
- min: 0
- name: status
type: string
description: Order status
constraints:
- in: [pending, completed, cancelled, refunded]
- name: customer_segment
type: string
description: Customer value segment
- name: product_count
type: integer
description: Number of products in order
access:
discovery: public
read:
- role: analyst
- role: data_scientist
- domain: marketing
- domain: finance
write:
- domain: sales
sla:
availability: 99.9%
freshness:
max_age_hours: 24
update_schedule: "0 2 * * *"
completeness:
min_threshold: 99.5%
quality_checks:
- name: no_negative_amounts
query: "SELECT COUNT(*) FROM orders WHERE total_amount < 0"
threshold: 0
- name: valid_status
query: "SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')"
threshold: 0
- name: referential_integrity
query: "SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL"
threshold: 0
observability:
metrics:
- row_count
- avg_order_value
- null_percentage_by_column
- schema_drift
alerts:
- type: freshness
condition: age_hours > 26
severity: critical
- type: volume
condition: row_count_change > 50%
severity: warning
- type: quality
condition: quality_check_failed
severity: critical
changelog:
- version: 2.1.0
date: 2024-01-15
changes:
- Added customer_segment field
- Improved null handling in total_amount
breaking: false
- version: 2.0.0
date: 2023-12-01
changes:
- Changed order_id from integer to string
- Removed legacy status values
breaking: true
```
**Data Product Implementation (Python):**
```python
# sales_orders_data_product.py
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict, Optional
import pandas as pd
from great_expectations.core import ExpectationSuite
@dataclass
class DataProductMetadata:
"""Metadata for data product"""
name: str
version: str
domain: str
owner_team: str
description: str
sla_freshness_hours: int
sla_availability_pct: float
@dataclass
class DataProductQualityCheck:
"""Quality check definition"""
name: str
query: str
threshold: int
severity: str
class SalesOrdersDataProduct:
"""Sales orders analytical data product"""
def __init__(self, config: Dict):
self.config = config
self.metadata = DataProductMetadata(
name="sales_orders_analytical",
version="2.1.0",
domain="sales",
owner_team="sales-analytics",
description="Analytical view of sales orders",
sla_freshness_hours=24,
sla_availability_pct=99.9
)
self.quality_checks = self._load_quality_checks()
def _load_quality_checks(self) -> List[DataProductQualityCheck]:
"""Load quality checks from config"""
return [
DataProductQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="valid_status",
query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="referential_integrity",
query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
threshold=0,
severity="critical"
)
]
def extract(self) -> pd.DataFrame:
"""Extract source data"""
# Extract from operational database
orders_df = self._extract_orders()
customers_df = self._extract_customers()
products_df = self._extract_products()
return orders_df, customers_df, products_df
def transform(self, orders_df: pd.DataFrame,
customers_df: pd.DataFrame,
products_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Join with customers
enriched = orders_df.Related in data
monte-carlo-push-ingestion
IncludedExpert guide for pushing metadata, lineage, and query logs to Monte Carlo from any data warehouse.
datascripts
php-database
IncludedPHP database mastery - PDO, Eloquent, Doctrine, query optimization, and migrations
datascripts
monte-carlo-validation-notebook
IncludedGenerates SQL validation notebooks for dbt PR changes with before/after comparison queries.
datascripts
monte-carlo-monitor-creation
IncludedGuides creation of Monte Carlo monitors via MCP tools, producing monitors-as-code YAML for CI/CD deployment.
data
monte-carlo-prevent
IncludedSurfaces Monte Carlo data observability context (table health, alerts, lineage, blast radius) before SQL/dbt edits.
data
airflow-expert
IncludedExpert-level Apache Airflow orchestration, DAGs, operators, sensors, XComs, task dependencies, and scheduling
data