Cloud Integration (AWS)
SourceFlow.Cloud.AWS extends the framework with distributed command and event processing using Amazon SQS, SNS, and KMS. Multiple application instances communicate through cloud messaging while preserving the same CQRS and event-sourcing patterns used locally.
Overview
dotnet add package SourceFlow.Cloud.AWS
Prerequisites: SourceFlow ≥ 2.0.0, .NET 8.0+ / .NET 9.0+ / .NET 10.0+
Quick Start
using SourceFlow.Cloud.AWS;
using Amazon;
// Register SourceFlow core
services.UseSourceFlow(typeof(Program).Assembly);
// Configure AWS cloud messaging
services.UseSourceFlowAws(
options =>
{
options.Region = RegionEndpoint.USEast1;
options.MaxConcurrentCalls = 10;
},
bus => bus
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
.Command<ProcessPaymentCommand>(q => q.Queue("payments.fifo"))
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Event<PaymentProcessedEvent>(t => t.Topic("payment-events"))
.Listen.To
.CommandQueue("orders.fifo")
.CommandQueue("payments.fifo")
.Subscribe.To
.Topic("order-events")
.Topic("payment-events"));
This registers AWS dispatchers, configures routing, starts SQS listeners, and automatically provisions queues/topics/subscriptions at startup.
Bus Configuration API
Send Commands to SQS Queues
.Send
.Command<CreateOrderCommand>(q => q.Queue("orders.fifo")) // FIFO queue
.Command<SendEmailCommand>(q => q.Queue("notifications")) // Standard queue
- FIFO queues (name ends with
.fifo) — Exactly-once processing, strict ordering, content-based deduplication - Standard queues — High throughput, at-least-once delivery, best-effort ordering
Raise Events to SNS Topics
.Raise
.Event<OrderCreatedEvent>(t => t.Topic("order-events"))
.Event<PaymentProcessedEvent>(t => t.Topic("payment-events"))
Listen to Command Queues & Subscribe to Event Topics
.Listen.To
.CommandQueue("orders.fifo")
.CommandQueue("payments.fifo")
.Subscribe.To
.Topic("order-events")
.Topic("payment-events")
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
Region | RegionEndpoint | Required | AWS region for SQS/SNS/KMS |
EnableCommandRouting | bool | true | Enable command dispatching to SQS |
EnableEventRouting | bool | true | Enable event publishing to SNS |
EnableCommandListener | bool | true | Enable SQS command polling |
MaxConcurrentCalls | int | 10 | Max concurrent message processing |
EnableEncryption | bool | false | Enable KMS message encryption |
KmsKeyId | string | null | KMS key ID or alias |
KMS Encryption
Enable envelope encryption for sensitive message payloads:
services.UseSourceFlowAws(
options =>
{
options.Region = RegionEndpoint.USEast1;
options.EnableEncryption = true;
options.KmsKeyId = "alias/sourceflow-key";
},
bus => ...);
Encryption flow: Generate data key (KMS) → Encrypt message (data key) → Encrypt data key (KMS master key) → Store in SQS.
Decryption flow: Retrieve message → Decrypt data key (KMS) → Decrypt message (data key) → Plaintext.
Idempotency
In-Memory (Single Instance)
Automatically registered with UseSourceFlowAws(). Suitable for single-instance deployments.
SQL-Based (Multi-Instance / Production)
// Install: dotnet add package SourceFlow.Stores.EntityFramework
// Register SQL-backed idempotency (replaces in-memory)
services.AddSourceFlowIdempotency(
connectionString: configuration.GetConnectionString("IdempotencyStore"),
cleanupIntervalMinutes: 60);
// Then configure AWS
services.UseSourceFlowAws(options => ..., bus => ...);
Always use SQL-based idempotency for multi-instance deployments. In-memory is insufficient for distributed systems.
Automatic Resource Provisioning
The AwsBusBootstrapper runs as an IHostedService at startup and automatically creates:
- SQS queues — Standard and FIFO with dead letter queues
- SNS topics — With display names
- SNS-to-SQS subscriptions — Raw message delivery enabled
All operations are idempotent — safe to run on every startup.
Message Flows
Command Flow (SQS)
Aggregate.Send(command) → CommandBus (assigns sequence number) → AwsSqsCommandDispatcher (checks routing, encrypts if enabled) → SQS Queue (message persisted) → AwsSqsCommandListener (polls queue, decrypts, idempotency check) → CommandBus.Publish (local processing) → Saga handles command
Event Flow (SNS → SQS)
Saga.Raise(event) → EventQueue (enqueues event) → AwsSnsEventDispatcher (checks routing, publishes to SNS) → SNS Topic (fan-out to subscribers) → SQS Queue (subscribed to topic) → AwsSqsCommandListener (polls queue, idempotency check) → EventQueue.Enqueue (local processing) → Views/Aggregates handle event
LocalStack Development
# Quick start (recommended)
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.ps1 # Windows
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.sh # Linux/macOS
# Manual setup
docker run -d --name sourceflow-localstack \
-p 4566:4566 \
-e SERVICES=sqs,sns,kms \
-e EAGER_SERVICE_LOADING=1 \
localstack/localstack:3
# Environment variables
export AWS_ENDPOINT_URL=http://localhost:4566
export AWS_DEFAULT_REGION=us-east-1
Health Checks
services.AddHealthChecks()
.AddCheck<AwsHealthCheck>("aws");
Checks SQS connectivity, SNS connectivity, KMS access (if encryption enabled), and queue/topic existence.
Observability
Activity Source: SourceFlow.Cloud.AWS
| Category | Metric |
|---|---|
| Commands | sourceflow.aws.command.dispatched / dispatch_duration / dispatch_error |
| Events | sourceflow.aws.event.published / publish_duration / publish_error |
| Messages | sourceflow.aws.message.received / processed / processing_duration |
Trace context is propagated via SQS message attributes for end-to-end distributed tracing.
IAM Permissions
Development (broad access)
{
"Statement": [
{ "Action": ["sqs:*"], "Resource": "arn:aws:sqs:*:*:*", "Effect": "Allow" },
{ "Action": ["sns:*"], "Resource": "arn:aws:sns:*:*:*", "Effect": "Allow" },
{ "Action": ["sts:GetCallerIdentity"], "Resource": "*", "Effect": "Allow" }
]
}
Production (restricted)
{
"Statement": [
{
"Action": ["sqs:CreateQueue", "sqs:GetQueueUrl", "sqs:SendMessage",
"sqs:ReceiveMessage", "sqs:DeleteMessage"],
"Resource": ["arn:aws:sqs:us-east-1:123456789012:orders.fifo"],
"Effect": "Allow"
},
{
"Action": ["sns:CreateTopic", "sns:Subscribe", "sns:Publish"],
"Resource": ["arn:aws:sns:us-east-1:123456789012:order-events"],
"Effect": "Allow"
}
]
}
Best Practices
- Use FIFO queues for ordered operations — Commands that must be processed in sequence per entity
- Use standard queues for independent operations — Notifications, emails, analytics
- Group related commands to the same queue —
CreateOrder,UpdateOrder,CancelOrder→orders.fifo - Enable SQL-based idempotency in production — In-memory is insufficient for multi-instance
- Enable KMS encryption for sensitive data — PII, financial data, health records
- Use IaC for production resources — CloudFormation/Terraform; bootstrapper for dev only
- Monitor health checks and metrics — Alert on
processing_errorand circuit breaker state - Configure dead letter queues — Review failed messages regularly