Skip to content

Event Streaming Architecture

Documentation for event streaming in the Virtufin API, covering gRPC streaming and Dapr pub/sub integration.

Overview

Virtufin API provides multiple event streaming mechanisms:

Mechanism Protocol Use Case
gRPC Streaming gRPC Native clients, high-performance
Dapr Pub/Sub Dapr Backend service integration

Event Types

StreamEvent (gRPC)

Standard event message for business events:

message StreamEvent {
  string timestamp = 1;        // ISO 8601 format
  string service = 2;         // Source service name
  string topic = 3;            // Topic/channel name
  string event_type = 4;       // Event classification
  bytes data = 5;              // Event payload (binary)
  string message_id = 6;       // Unique message identifier
  map<string, string> metadata = 7; // Additional metadata
}

Per-service topic model

Each service owns its pubsub topic for lifecycle events. The topic name is defined in the service's own Configuration/Topics.cs (e.g. websocketmanager.connectionstatus, workmanager.workerstatus); ce-type discriminates events within the topic. The topic name is the public contract between the publisher and its consumers — see each service's AGENTS.md and docs/v1/connection-lifecycle.md (or equivalent) for the event taxonomy.


gRPC Streaming

Subscribe Method

rpc Subscribe (SubscribeRequest) returns (stream StreamEvent);

message SubscribeRequest {
  repeated string services = 1;    // Filter by services
  repeated string topics = 2;      // Filter by topics
  repeated string event_types = 3; // Filter by event types
}

Note: The WatchSystemEvents RPC was removed. All events (including per-service lifecycle events) are delivered through the regular Subscribe RPC. Subscribe to the appropriate per-service topic (e.g. websocketmanager.connectionstatus, workmanager.workerstatus) to receive those events.

State Service Methods

// In Protos/state.proto
service State {
  rpc SaveState (SaveStateRequest) returns (SaveStateResponse);
  rpc GetState (GetStateRequest) returns (GetStateResponse);
  rpc GetAllState (GetAllStateRequest) returns (GetAllStateResponse);
  rpc RegisterKeys (RegisterKeysRequest) returns (RegisterKeysResponse);
  rpc DeleteState (DeleteStateRequest) returns (DeleteStateResponse);
}

message SaveStateRequest {
  string service = 1;     // Service name (e.g., "state")
  string key = 2;         // State key
  string value = 3;       // State value
  string etag = 4;        // Optional ETag for concurrency
  bool include_value = 5;  // If true, include value in event payload (default: false)
}

message SaveStateResponse {
  StatusResponse status = 1;
}

message GetStateRequest {
  string service = 1;
  string key = 2;
}

message GetStateResponse {
  string key = 1;
  string value = 2;
  string etag = 3;
}

message GetAllStateRequest {
  string service = 1;
}

message GetAllStateResponse {
  map<string, string> values = 1;
}

message RegisterKeysRequest {
  string service = 1;
  repeated string keys = 2;
}

message RegisterKeysResponse {
  StatusResponse status = 1;
}

message DeleteStateRequest {
  string service = 1;
  string key = 2;
  bool include_value = 3;  // If true, include last value in event payload (default: false)
}

message DeleteStateResponse {
  StatusResponse status = 1;
}

Key behavior: - SaveState broadcasts a state change event to the state-changes topic via Dapr pub/sub - When include_value=true, the event payload includes the value - DeleteState removes state and broadcasts a delete event with action: "delete"

State Change Event Payload

Events published to state-changes topic have the following structure:

{
  "service": "workmanager",
  "key": "my-key",
  "action": "save" | "delete",
  "value": "the-state-value"  // only when include_value=true
}

C# Client Example

using Grpc.Core;
using Virtufin.Api.Protos;

var channel = GrpcChannel.ForAddress("http://localhost:5002");
var client = new State.StateClient(channel);

// Save state
var request = new SaveStateRequest
{
    Service = "state",
    Key = "mykey",
    Value = "myvalue"
};
var response = client.SaveState(request);
Console.WriteLine($"Success: {response.Success}, Status: {response.Status}");

Python Client Example

import grpc
import gateway_pb2
import gateway_pb2_grpc

channel = grpc.insecure_channel('localhost:5002')
stub = gateway_pb2_grpc.GatewayStub(channel)

request = gateway_pb2.SubscribeRequest(
    services=['workmanager'],
    event_types=['state', 'pubsub']
)

for event in stub.Subscribe(request):
    print(f"[{event.timestamp}] {event.service}/{event.topic}: {event.event_type}")
    print(f"Data: {event.data.decode('utf-8')}")

Dapr Pub/Sub Integration

Architecture

flowchart LR
    Backend[Backend Service] --> DaprSidecar[Dapr Sidecar]
    DaprSidecar --> DaprPubSub[Dapr PubSub]
    DaprSidecar --> Subscribe[Subscribe]
    Subscribe --> DaprStreamSub[DaprStreamingSub Manager]
    DaprStreamSub --> EventSubMgr[EventSub Manager]

Configuration

services:
  - name: workmanager
    pubsub:
      pubsubName: pubsub              # Dapr pub/sub component name

Note: Dynamic pub/sub subscriptions are handled via the PubsubService.Subscribe gRPC method, not static YAML configuration. Topics are created dynamically at runtime.

Message Handler

When a Dapr message is received:

private async Task<TopicResponseAction> HandleMessageAsync(
    string serviceName, 
    string topic, 
    TopicMessage message, 
    CancellationToken ct)
{
    // 1. Deserialize message
    var eventData = new
    {
        timestamp = DateTime.UtcNow.ToString("O"),
        service = serviceName,
        topic = topic,
        data = JsonSerializer.Deserialize<object>(message.Data.Span),
        messageId = message.Id
    };

    // 2. Broadcast to gRPC subscribers
    await _eventSubscriptionManager.BroadcastToTopicAsync(
        serviceName, topic, message.Data.Span.ToArray(), message.Id);

    return TopicResponseAction.Success;
}

Dapr Client Setup

builder.Services.AddSingleton<DaprClient>(sp =>
    new DaprClientBuilder()
        .UseHttpEndpoint($"http://localhost:{daprHttpPort}")
        .UseGrpcEndpoint($"http://localhost:{daprGrpcPort}")
        .Build());

Event Subscription System

Scope Hierarchy

Events flow through hierarchical scopes for flexible filtering:

global
  │
  ├── workmanager
  │     ├── workmanager/workers
  │     └── workmanager/deployments
  │
  ├── websocketmanager
  │     └── websocketmanager/connections
  │
  └── event-type
        ├── event-type/state
        ├── event-type/pubsub
        └── event-type/system

Subscription Matching

When an event is broadcast:

  1. Event arrives with service, topic, eventType
  2. Calculate all matching scopes:
  3. global
  4. {service}
  5. {service}/{topic}
  6. event-type/{eventType}
  7. event-type/{eventType}/{service}
  8. Find all subscriptions for each scope
  9. Write event to each matching subscription
  10. Remove dead subscriptions (failed writes)

EventSubscriptionManager

public class EventSubscriptionManager
{
    // Subscribe
    public string Subscribe(
        IServerStreamWriter<StreamEvent> writer, 
        SubscribeRequest request)

    // Unsubscribe
    public Task UnsubscribeAsync(string scope, string subscriptionId)

    // Broadcast methods
    public async Task BroadcastEventAsync(
        string service, string topic, string eventType, 
        byte[] data, string? messageId = null, 
        Dictionary<string, string>? metadata = null)

    public async Task BroadcastStateChangeAsync(
        string service, string key, string action)

    public async Task BroadcastToTopicAsync(
        string service, string topic, byte[] data, string? messageId = null)
}

Per-service Lifecycle Events

Each service publishes lifecycle events to its own topic. The topic name is defined in that service's Configuration/Topics.cs. The CloudEvents ce-type attribute (set in the publish metadata map) discriminates the specific event.

Connection Status Events

Broadcast when Dapr pub/sub subscriptions change state:

await BroadcastConnectionStatusAsync(
    service: "workmanager",
    topic: Topics.WorkerStatus,        // "workmanager.workerstatus"
    status: "connected"  // or "disconnected", "failed"
    error: null          // error message if failed
);

Event Flow

  1. DaprStreamingSubscriptionManager calls StartSubscriptionAsync
  2. On success: broadcasts "connected" status
  3. On failure: broadcasts "failed" status with error

  4. BroadcastReconnectionStatusAsync sends lifecycle events to gRPC subscribers via the per-service topic


Event Flow Examples

Example 1: State Change Event via SaveState

sequenceDiagram
    participant Client
    participant State as StateService
    participant SubMgr as EventSubscriptionManager
    participant Subscribers

    Client->>State: SaveState(service, key, value)
    State->>SubMgr: BroadcastStateChangeAsync()
    SubMgr->>SubMgr: Create StreamEvent
    SubMgr->>SubMgr: GetMatchingScopes()
    loop For each scope
        SubMgr->>Subscribers: Write event to stream
    end
    Subscribers-->>Client: StreamEvent received

Example 2: Dapr Pub/Sub Event

sequenceDiagram
    participant Publisher
    participant Dapr as Dapr Sidecar
    participant DaprMgr as DaprStreamingSubMgr
    participant SubMgr as EventSubscriptionManager
    participant Client

    Publisher->>Dapr: publish(topic, data)
    Dapr->>DaprMgr: TopicMessage
    DaprMgr->>DaprMgr: Deserialize message
    DaprMgr->>SubMgr: BroadcastToTopicAsync()
    SubMgr->>Client: StreamEvent via gRPC
    DaprMgr-->>Dapr: TopicResponseAction.Success

Example 3: System Connection Event

sequenceDiagram
    participant Dapr as Dapr Sidecar
    participant DaprMgr as DaprStreamingSubMgr
    participant Client

    Dapr->>DaprMgr: Subscription connected
    DaprMgr->>Client: TopicEvent via Subscribe (topic: <service>.<category>, e.g. websocketmanager.connectionstatus)

Best Practices

Subscription Management

  1. Use CancellationTokens: Always pass cancellation tokens to prevent leaks
  2. Handle Disconnection: Implement reconnection logic
  3. Scope Filtering: Subscribe to specific scopes to reduce traffic
// Good: Specific subscription
var request = new SubscribeRequest
{
    Services = { "workmanager" },
    EventTypes = { "state" }
};

// Avoids: Global subscription (high traffic)
var request = new SubscribeRequest(); // Subscribes to everything

Error Handling

try
{
    await foreach (var evt in stream.ResponseStream.ReadAllAsync(token))
    {
        ProcessEvent(evt);
    }
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
    // Client disconnected - normal
}
catch (RpcException ex)
{
    // Handle error, possibly reconnect
}