Cloud Integration (AWS)

SourceFlow.Cloud.AWS extends the framework with distributed command and event processing using Amazon SQS, SNS, and KMS. Multiple application instances communicate through cloud messaging while preserving the same CQRS and event-sourcing patterns used locally.

Overview

Terminal
dotnet add package SourceFlow.Cloud.AWS

Prerequisites: SourceFlow ≥ 2.0.0, .NET 8.0+ / .NET 9.0+ / .NET 10.0+

Quick Start

C#
using SourceFlow.Cloud.AWS;
using Amazon;

// Register SourceFlow core
services.UseSourceFlow(typeof(Program).Assembly);

// Configure AWS cloud messaging
services.UseSourceFlowAws(
    options =>
    {
        options.Region = RegionEndpoint.USEast1;
        options.MaxConcurrentCalls = 10;
    },
    bus => bus
        .Send
            .Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))
            .Command<ProcessPaymentCommand>(q => q.Queue("payments.fifo"))
        .Raise
            .Event<OrderCreatedEvent>(t => t.Topic("order-events"))
            .Event<PaymentProcessedEvent>(t => t.Topic("payment-events"))
        .Listen.To
            .CommandQueue("orders.fifo")
            .CommandQueue("payments.fifo")
        .Subscribe.To
            .Topic("order-events")
            .Topic("payment-events"));

This registers AWS dispatchers, configures routing, starts SQS listeners, and automatically provisions queues/topics/subscriptions at startup.

Bus Configuration API

Send Commands to SQS Queues

C#
.Send
    .Command<CreateOrderCommand>(q => q.Queue("orders.fifo"))   // FIFO queue
    .Command<SendEmailCommand>(q => q.Queue("notifications"))  // Standard queue
  • FIFO queues (name ends with .fifo) — Exactly-once processing, strict ordering, content-based deduplication
  • Standard queues — High throughput, at-least-once delivery, best-effort ordering

Raise Events to SNS Topics

C#
.Raise
    .Event<OrderCreatedEvent>(t => t.Topic("order-events"))
    .Event<PaymentProcessedEvent>(t => t.Topic("payment-events"))

Listen to Command Queues & Subscribe to Event Topics

C#
.Listen.To
    .CommandQueue("orders.fifo")
    .CommandQueue("payments.fifo")
.Subscribe.To
    .Topic("order-events")
    .Topic("payment-events")

Configuration Options

OptionTypeDefaultDescription
RegionRegionEndpointRequiredAWS region for SQS/SNS/KMS
EnableCommandRoutingbooltrueEnable command dispatching to SQS
EnableEventRoutingbooltrueEnable event publishing to SNS
EnableCommandListenerbooltrueEnable SQS command polling
MaxConcurrentCallsint10Max concurrent message processing
EnableEncryptionboolfalseEnable KMS message encryption
KmsKeyIdstringnullKMS key ID or alias

KMS Encryption

Enable envelope encryption for sensitive message payloads:

C#
services.UseSourceFlowAws(
    options =>
    {
        options.Region = RegionEndpoint.USEast1;
        options.EnableEncryption = true;
        options.KmsKeyId = "alias/sourceflow-key";
    },
    bus => ...);

Encryption flow: Generate data key (KMS) → Encrypt message (data key) → Encrypt data key (KMS master key) → Store in SQS.

Decryption flow: Retrieve message → Decrypt data key (KMS) → Decrypt message (data key) → Plaintext.

Idempotency

In-Memory (Single Instance)

Automatically registered with UseSourceFlowAws(). Suitable for single-instance deployments.

SQL-Based (Multi-Instance / Production)

C#
// Install: dotnet add package SourceFlow.Stores.EntityFramework

// Register SQL-backed idempotency (replaces in-memory)
services.AddSourceFlowIdempotency(
    connectionString: configuration.GetConnectionString("IdempotencyStore"),
    cleanupIntervalMinutes: 60);

// Then configure AWS
services.UseSourceFlowAws(options => ..., bus => ...);
⚠️ Production Requirement

Always use SQL-based idempotency for multi-instance deployments. In-memory is insufficient for distributed systems.

Automatic Resource Provisioning

The AwsBusBootstrapper runs as an IHostedService at startup and automatically creates:

  • SQS queues — Standard and FIFO with dead letter queues
  • SNS topics — With display names
  • SNS-to-SQS subscriptions — Raw message delivery enabled

All operations are idempotent — safe to run on every startup.

Message Flows

Command Flow (SQS)

Aggregate.Send(command)
  → CommandBus (assigns sequence number)
  → AwsSqsCommandDispatcher (checks routing, encrypts if enabled)
  → SQS Queue (message persisted)
  → AwsSqsCommandListener (polls queue, decrypts, idempotency check)
  → CommandBus.Publish (local processing)
  → Saga handles command

Event Flow (SNS → SQS)

Saga.Raise(event)
  → EventQueue (enqueues event)
  → AwsSnsEventDispatcher (checks routing, publishes to SNS)
  → SNS Topic (fan-out to subscribers)
  → SQS Queue (subscribed to topic)
  → AwsSqsCommandListener (polls queue, idempotency check)
  → EventQueue.Enqueue (local processing)
  → Views/Aggregates handle event

LocalStack Development

Terminal
# Quick start (recommended)
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.ps1  # Windows
./tests/SourceFlow.Cloud.AWS.Tests/run-integration-tests.sh   # Linux/macOS

# Manual setup
docker run -d --name sourceflow-localstack \
  -p 4566:4566 \
  -e SERVICES=sqs,sns,kms \
  -e EAGER_SERVICE_LOADING=1 \
  localstack/localstack:3

# Environment variables
export AWS_ENDPOINT_URL=http://localhost:4566
export AWS_DEFAULT_REGION=us-east-1

Health Checks

C#
services.AddHealthChecks()
    .AddCheck<AwsHealthCheck>("aws");

Checks SQS connectivity, SNS connectivity, KMS access (if encryption enabled), and queue/topic existence.

Observability

Activity Source: SourceFlow.Cloud.AWS

CategoryMetric
Commandssourceflow.aws.command.dispatched / dispatch_duration / dispatch_error
Eventssourceflow.aws.event.published / publish_duration / publish_error
Messagessourceflow.aws.message.received / processed / processing_duration

Trace context is propagated via SQS message attributes for end-to-end distributed tracing.

IAM Permissions

Development (broad access)

JSON
{
  "Statement": [
    { "Action": ["sqs:*"], "Resource": "arn:aws:sqs:*:*:*", "Effect": "Allow" },
    { "Action": ["sns:*"], "Resource": "arn:aws:sns:*:*:*", "Effect": "Allow" },
    { "Action": ["sts:GetCallerIdentity"], "Resource": "*", "Effect": "Allow" }
  ]
}

Production (restricted)

JSON
{
  "Statement": [
    {
      "Action": ["sqs:CreateQueue", "sqs:GetQueueUrl", "sqs:SendMessage",
                  "sqs:ReceiveMessage", "sqs:DeleteMessage"],
      "Resource": ["arn:aws:sqs:us-east-1:123456789012:orders.fifo"],
      "Effect": "Allow"
    },
    {
      "Action": ["sns:CreateTopic", "sns:Subscribe", "sns:Publish"],
      "Resource": ["arn:aws:sns:us-east-1:123456789012:order-events"],
      "Effect": "Allow"
    }
  ]
}

Best Practices

  • Use FIFO queues for ordered operations — Commands that must be processed in sequence per entity
  • Use standard queues for independent operations — Notifications, emails, analytics
  • Group related commands to the same queueCreateOrder, UpdateOrder, CancelOrderorders.fifo
  • Enable SQL-based idempotency in production — In-memory is insufficient for multi-instance
  • Enable KMS encryption for sensitive data — PII, financial data, health records
  • Use IaC for production resources — CloudFormation/Terraform; bootstrapper for dev only
  • Monitor health checks and metrics — Alert on processing_error and circuit breaker state
  • Configure dead letter queues — Review failed messages regularly