azure-eventhub-dotnet
Azure Event Hubs SDK for .NET. Use for high-throughput event streaming: sending events (EventHubProducerClient, EventHubBufferedProducerClient), receiving events (EventProcessorClient with checkpointing), partition management, and real-time data ingestion. Triggers: "Event Hubs", "event streaming", "EventHubProducerClient", "EventProcessorClient", "send events", "receive events", "checkpointing", "partition".
What this skill does
# Azure.Messaging.EventHubs (.NET)
High-throughput event streaming SDK for sending and receiving events via Azure Event Hubs.
## Installation
```bash
# Core package (sending and simple receiving)
dotnet add package Azure.Messaging.EventHubs
# Processor package (production receiving with checkpointing)
dotnet add package Azure.Messaging.EventHubs.Processor
# Authentication
dotnet add package Azure.Identity
# For checkpointing (required by EventProcessorClient)
dotnet add package Azure.Storage.Blobs
```
**Current Versions**: Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.2
## Environment Variables
```bash
EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=<event-hub-name>
# For checkpointing (EventProcessorClient)
BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>
BLOB_CONTAINER_NAME=<checkpoint-container>
# Alternative: Connection string auth (not recommended for production)
EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
```
## Authentication
```csharp
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
// Always use DefaultAzureCredential for production
var credential = new DefaultAzureCredential();
var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");
var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");
var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
credential);
```
**Required RBAC Roles**:
- **Sending**: `Azure Event Hubs Data Sender`
- **Receiving**: `Azure Event Hubs Data Receiver`
- **Both**: `Azure Event Hubs Data Owner`
## Client Types
| Client | Purpose | When to Use |
|--------|---------|-------------|
| `EventHubProducerClient` | Send events immediately in batches | Real-time sending, full control over batching |
| `EventHubBufferedProducerClient` | Automatic batching with background sending | High-volume, fire-and-forget scenarios |
| `EventHubConsumerClient` | Simple event reading | Prototyping only, NOT for production |
| `EventProcessorClient` | Production event processing | **Always use this for receiving in production** |
## Core Workflow
### 1. Send Events (Batch)
```csharp
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
await using var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Create a batch (respects size limits automatically)
using EventDataBatch batch = await producer.CreateBatchAsync();
// Add events to batch
var events = new[]
{
new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),
new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))
};
foreach (var eventData in events)
{
if (!batch.TryAdd(eventData))
{
// Batch is full - send it and create a new one
await producer.SendAsync(batch);
batch = await producer.CreateBatchAsync();
if (!batch.TryAdd(eventData))
{
throw new Exception("Event too large for empty batch");
}
}
}
// Send remaining events
if (batch.Count > 0)
{
await producer.SendAsync(batch);
}
```
### 2. Send Events (Buffered - High Volume)
```csharp
using Azure.Messaging.EventHubs.Producer;
var options = new EventHubBufferedProducerClientOptions
{
MaximumWaitTime = TimeSpan.FromSeconds(1)
};
await using var producer = new EventHubBufferedProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential(),
options);
// Handle send success/failure
producer.SendEventBatchSucceededAsync += args =>
{
Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");
return Task.CompletedTask;
};
producer.SendEventBatchFailedAsync += args =>
{
Console.WriteLine($"Batch failed: {args.Exception.Message}");
return Task.CompletedTask;
};
// Enqueue events (sent automatically in background)
for (int i = 0; i < 1000; i++)
{
await producer.EnqueueEventAsync(new EventData($"Event {i}"));
}
// Flush remaining events before disposing
await producer.FlushAsync();
```
### 3. Receive Events (Production - EventProcessorClient)
```csharp
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
// Blob container for checkpointing
var blobClient = new BlobContainerClient(
Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),
Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));
await blobClient.CreateIfNotExistsAsync();
// Create processor
var processor = new EventProcessorClient(
blobClient,
EventHubConsumerClient.DefaultConsumerGroup,
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Handle events
processor.ProcessEventAsync += async args =>
{
Console.WriteLine($"Partition: {args.Partition.PartitionId}");
Console.WriteLine($"Data: {args.Data.EventBody}");
// Checkpoint after processing (or batch checkpoints)
await args.UpdateCheckpointAsync();
};
// Handle errors
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
Console.WriteLine($"Partition: {args.PartitionId}");
return Task.CompletedTask;
};
// Start processing
await processor.StartProcessingAsync();
// Run until cancelled
await Task.Delay(Timeout.Infinite, cancellationToken);
// Stop gracefully
await processor.StopProcessingAsync();
```
### 4. Partition Operations
```csharp
// Get partition IDs
string[] partitionIds = await producer.GetPartitionIdsAsync();
// Send to specific partition (use sparingly)
var options = new SendEventOptions
{
PartitionId = "0"
};
await producer.SendAsync(events, options);
// Use partition key (recommended for ordering)
var batchOptions = new CreateBatchOptions
{
PartitionKey = "customer-123" // Events with same key go to same partition
};
using var batch = await producer.CreateBatchAsync(batchOptions);
```
## EventPosition Options
Control where to start reading:
```csharp
// Start from beginning
EventPosition.Earliest
// Start from end (new events only)
EventPosition.Latest
// Start from specific offset
EventPosition.FromOffset(12345)
// Start from specific sequence number
EventPosition.FromSequenceNumber(100)
// Start from specific time
EventPosition.FromEnqueuedTime(DateTimeOffset.UtcNow.AddHours(-1))
```
## ASP.NET Core Integration
```csharp
// Program.cs
using Azure.Identity;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Extensions.Azure;
builder.Services.AddAzureClients(clientBuilder =>
{
clientBuilder.AddEventHubProducerClient(
builder.Configuration["EventHub:FullyQualifiedNamespace"],
builder.Configuration["EventHub:Name"]);
clientBuilder.UseCredential(new DefaultAzureCredential());
});
// Inject in controller/service
public class EventService
{
private readonly EventHubProducerClient _producer;
public EventService(EventHubProducerClient producer)
{
_producer = producer;
}
public async Task SendAsync(string message)
{
using var batch = await _producer.CreateBatchAsync();
batch.TryAdd(new EventData(message));
await _producer.SendAsync(batch);
}
}
```
## Best Practices
1. **Use `EventProcessorClient` for receiving** — Never use `EventHubConsumerClient` in production
2. **Checkpoint strategically** — After N events or time interval, not every event
3. **Use partition keys** — For ordering guarantees within a partition
4. **Reuse clients** — Create once, use as singleton (thread-safe)
5. **Use `await using`** — Ensures proper disposal
6. **Handle `ProcessErrorAsync`** — Always register error handler
7. **Batch events** —Related in Backend & APIs
jfrog
IncludedInteract with the JFrog Platform via the JFrog CLI and REST/GraphQL APIs. Use this skill when the user wants to manage Artifactory repositories, upload or download artifacts, manage builds, configure permissions, manage users and groups, work with access tokens, configure JFrog CLI servers, search artifacts, manage properties, set up replication, manage JFrog Projects, run security audits or scans, look up CVE details, query exposures scan results from JFrog Advanced Security, manage release bundles and lifecycle operations, aggregate or export platform data, or perform any JFrog Platform administration task. Also use when the user mentions jf, jfrog, artifactory, xray, distribution, evidence, apptrust, onemodel, graphql, workers, mission control, curation, advanced security, exposures, or any JFrog product name.
cupynumeric-migration-readiness
IncludedPre-migration readiness assessor for porting NumPy to cuPyNumeric. Use BEFORE substantial porting work begins when the user asks whether code will scale on GPU, whether they should migrate to cuPyNumeric, which NumPy patterns transfer cleanly, what must be refactored before porting, or mentions pre-port assessment, scaling analysis, or refactor planning. Inspect the user's source code, look up NumPy usage, cross-reference the cuPyNumeric API support manifest, and distinguish distributed-scaling-friendly patterns from blockers such as unsupported APIs, scalar synchronization, host round-trips, Python/object-heavy control flow, shape/data-dependent branching, and in-place mutation hazards. Produce a verdict of READY, LIGHT REFACTOR, SIGNIFICANT REFACTOR, or NOT RECOMMENDED, with concrete refactor pointers.
alibabacloud-data-agent-skill
IncludedInvoke Alibaba Cloud Apsara Data Agent for Analytics via CLI to perform natural language-driven data analysis on enterprise databases. Data Agent for Analytics is an intelligent data analysis agent developed by Alibaba Cloud Database team for enterprise users. It automatically completes requirement analysis, data understanding, analysis insights, and report generation based on natural language descriptions. This tool supports: discovering data resources (instances/databases/tables) managed in DMS, initiating query or deep analysis sessions, real-time progress tracking, and retrieving analysis conclusions and generated reports. Use this Skill when users need to query databases, analyze data trends, generate data reports, ask questions in natural language, or mention "Data Agent", "data analysis", "database query", "SQL analysis", "data insights".
token-optimizer
IncludedReduce OpenClaw token usage and API costs through smart model routing, heartbeat optimization, budget tracking, and native 2026.2.15 features (session pruning, bootstrap size limits, cache TTL alignment). Use when token costs are high, API rate limits are being hit, or hosting multiple agents at scale. The 4 executable scripts (context_optimizer, model_router, heartbeat_optimizer, token_tracker) are local-only — no network requests, no subprocess calls, no system modifications. Reference files (PROVIDERS.md, config-patches.json) document optional multi-provider strategies that require external API keys and network access if you choose to use them. See SECURITY.md for full breakdown.
resend-cli
IncludedUse this skill when the task is specifically about operating Resend from an AI agent, terminal session, or CI job via the official resend CLI: installing/authenticating the CLI, sending/listing/updating/cancelling emails, batch sends, domains and DNS, webhooks and local listeners, inbound receiving, contacts, topics, segments, broadcasts, templates, API keys, profiles, or debugging Resend CLI/API failures. Trigger on mentions of Resend CLI, `resend`, `resend doctor`, `resend emails send`, `resend domains`, `resend webhooks listen`, `resend emails receiving`, or agent-friendly terminal automation.
alibabacloud-odps-maxframe-coding
IncludedUse this skill for MaxFrame SDK development and documentation navigation on Alibaba Cloud MaxCompute (ODPS). Helps answer MaxFrame API, concept, official example, and supported pandas API questions; create data processing programs; read/write MaxCompute tables; debug jobs (remote or local); and build custom DPE runtime images. Trigger when users mention MaxFrame, MaxCompute with MaxFrame, ODPS table processing, DPE runtime, MaxFrame docs/examples, DataFrame/Tensor operations, or GPU runtime setup. Works for both English and Chinese queries about Alibaba Cloud data processing with MaxFrame.