microservices-expert
Included with Lifetime
$97 forever
Expert-level microservices architecture, patterns, service mesh, and distributed systems
apimicroservicesdistributed-systemsservice-mesharchitecture
What this skill does
# Microservices Expert
Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.
## Core Concepts
### Microservices Principles
- Single responsibility per service
- Independently deployable
- Decentralized data management
- Infrastructure automation
- Design for failure
- Evolutionary design
### Architecture Patterns
- API Gateway
- Service Discovery
- Circuit Breaker
- Saga Pattern
- Event Sourcing
- CQRS
### Communication
- Synchronous (HTTP/REST, gRPC)
- Asynchronous (Message queues, Events)
- Service mesh
- API composition
- Backend for Frontend (BFF)
## Service Design
```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio
# Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def __init__(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""Check inventory availability with circuit breaker"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""Process payment with circuit breaker"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""Create order with coordination"""
# 1. Check inventory
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. Process payment
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. Reserve inventory
await self.reserve_inventory(order.items)
# 4. Create order record
order.status = "confirmed"
await self.save_order(order)
return order
@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
```
## Saga Pattern (Distributed Transactions)
```python
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""Orchestrate distributed transactions using Saga pattern"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""Execute saga with compensation on failure"""
try:
# Execute all steps
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""Rollback completed steps"""
# Execute compensations in reverse order
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")
# Example: Order Saga
class OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# Step 2: Process payment
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# Step 3: Create order
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# Execute saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}
```
## Service Discovery
```python
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""Register service with Consul"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""Discover healthy service instance"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# Load balance: random selection
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""Get all healthy instances of a service"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]
```
## API Gateway
```python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API GatewRelated in api
graphql-expert
IncludedExpert-level GraphQL API development with schema design, resolvers, and subscriptions
api
api-design-expert
IncludedExpert-level API design principles, REST, GraphQL, versioning, and API best practices
api
openapi-expert
IncludedExpert-level OpenAPI/Swagger specification for API design, documentation, and code generation
api
grpc-expert
IncludedExpert-level gRPC, Protocol Buffers, microservices communication, and streaming
api
fastapi-expert
IncludedExpert-level FastAPI development for high-performance Python APIs with async support
api