Event Streaming Architecture
Documentation for event streaming in the Virtufin API, covering gRPC streaming and Dapr pub/sub integration.
Overview
Virtufin API provides multiple event streaming mechanisms:
| Mechanism | Protocol | Use Case |
|---|---|---|
| gRPC Streaming | gRPC | Native clients, high-performance |
| Dapr Pub/Sub | Dapr | Backend service integration |
Event Types
StreamEvent (gRPC)
Standard event message for business events:
message StreamEvent {
string timestamp = 1; // ISO 8601 format
string service = 2; // Source service name
string topic = 3; // Topic/channel name
string event_type = 4; // Event classification
bytes data = 5; // Event payload (binary)
string message_id = 6; // Unique message identifier
map<string, string> metadata = 7; // Additional metadata
}
Per-service topic model
Each service owns its pubsub topic for lifecycle events. The topic name is defined in the service's own Configuration/Topics.cs (e.g. websocketmanager.connectionstatus, workmanager.workerstatus); ce-type discriminates events within the topic. The topic name is the public contract between the publisher and its consumers — see each service's AGENTS.md and docs/v1/connection-lifecycle.md (or equivalent) for the event taxonomy.
gRPC Streaming
Subscribe Method
rpc Subscribe (SubscribeRequest) returns (stream StreamEvent);
message SubscribeRequest {
repeated string services = 1; // Filter by services
repeated string topics = 2; // Filter by topics
repeated string event_types = 3; // Filter by event types
}
Note: The
WatchSystemEventsRPC was removed. All events (including per-service lifecycle events) are delivered through the regularSubscribeRPC. Subscribe to the appropriate per-service topic (e.g.websocketmanager.connectionstatus,workmanager.workerstatus) to receive those events.
State Service Methods
// In Protos/state.proto
service State {
rpc SaveState (SaveStateRequest) returns (SaveStateResponse);
rpc GetState (GetStateRequest) returns (GetStateResponse);
rpc GetAllState (GetAllStateRequest) returns (GetAllStateResponse);
rpc RegisterKeys (RegisterKeysRequest) returns (RegisterKeysResponse);
rpc DeleteState (DeleteStateRequest) returns (DeleteStateResponse);
}
message SaveStateRequest {
string service = 1; // Service name (e.g., "state")
string key = 2; // State key
string value = 3; // State value
string etag = 4; // Optional ETag for concurrency
bool include_value = 5; // If true, include value in event payload (default: false)
}
message SaveStateResponse {
StatusResponse status = 1;
}
message GetStateRequest {
string service = 1;
string key = 2;
}
message GetStateResponse {
string key = 1;
string value = 2;
string etag = 3;
}
message GetAllStateRequest {
string service = 1;
}
message GetAllStateResponse {
map<string, string> values = 1;
}
message RegisterKeysRequest {
string service = 1;
repeated string keys = 2;
}
message RegisterKeysResponse {
StatusResponse status = 1;
}
message DeleteStateRequest {
string service = 1;
string key = 2;
bool include_value = 3; // If true, include last value in event payload (default: false)
}
message DeleteStateResponse {
StatusResponse status = 1;
}
Key behavior:
- SaveState broadcasts a state change event to the state-changes topic via Dapr pub/sub
- When include_value=true, the event payload includes the value
- DeleteState removes state and broadcasts a delete event with action: "delete"
State Change Event Payload
Events published to state-changes topic have the following structure:
{
"service": "workmanager",
"key": "my-key",
"action": "save" | "delete",
"value": "the-state-value" // only when include_value=true
}
C# Client Example
using Grpc.Core;
using Virtufin.Api.Protos;
var channel = GrpcChannel.ForAddress("http://localhost:5002");
var client = new State.StateClient(channel);
// Save state
var request = new SaveStateRequest
{
Service = "state",
Key = "mykey",
Value = "myvalue"
};
var response = client.SaveState(request);
Console.WriteLine($"Success: {response.Success}, Status: {response.Status}");
Python Client Example
import grpc
import gateway_pb2
import gateway_pb2_grpc
channel = grpc.insecure_channel('localhost:5002')
stub = gateway_pb2_grpc.GatewayStub(channel)
request = gateway_pb2.SubscribeRequest(
services=['workmanager'],
event_types=['state', 'pubsub']
)
for event in stub.Subscribe(request):
print(f"[{event.timestamp}] {event.service}/{event.topic}: {event.event_type}")
print(f"Data: {event.data.decode('utf-8')}")
Dapr Pub/Sub Integration
Architecture
flowchart LR
Backend[Backend Service] --> DaprSidecar[Dapr Sidecar]
DaprSidecar --> DaprPubSub[Dapr PubSub]
DaprSidecar --> Subscribe[Subscribe]
Subscribe --> DaprStreamSub[DaprStreamingSub Manager]
DaprStreamSub --> EventSubMgr[EventSub Manager]
Configuration
services:
- name: workmanager
pubsub:
pubsubName: pubsub # Dapr pub/sub component name
Note: Dynamic pub/sub subscriptions are handled via the
PubsubService.SubscribegRPC method, not static YAML configuration. Topics are created dynamically at runtime.
Message Handler
When a Dapr message is received:
private async Task<TopicResponseAction> HandleMessageAsync(
string serviceName,
string topic,
TopicMessage message,
CancellationToken ct)
{
// 1. Deserialize message
var eventData = new
{
timestamp = DateTime.UtcNow.ToString("O"),
service = serviceName,
topic = topic,
data = JsonSerializer.Deserialize<object>(message.Data.Span),
messageId = message.Id
};
// 2. Broadcast to gRPC subscribers
await _eventSubscriptionManager.BroadcastToTopicAsync(
serviceName, topic, message.Data.Span.ToArray(), message.Id);
return TopicResponseAction.Success;
}
Dapr Client Setup
builder.Services.AddSingleton<DaprClient>(sp =>
new DaprClientBuilder()
.UseHttpEndpoint($"http://localhost:{daprHttpPort}")
.UseGrpcEndpoint($"http://localhost:{daprGrpcPort}")
.Build());
Event Subscription System
Scope Hierarchy
Events flow through hierarchical scopes for flexible filtering:
global
│
├── workmanager
│ ├── workmanager/workers
│ └── workmanager/deployments
│
├── websocketmanager
│ └── websocketmanager/connections
│
└── event-type
├── event-type/state
├── event-type/pubsub
└── event-type/system
Subscription Matching
When an event is broadcast:
- Event arrives with service, topic, eventType
- Calculate all matching scopes:
global{service}{service}/{topic}event-type/{eventType}event-type/{eventType}/{service}- Find all subscriptions for each scope
- Write event to each matching subscription
- Remove dead subscriptions (failed writes)
EventSubscriptionManager
public class EventSubscriptionManager
{
// Subscribe
public string Subscribe(
IServerStreamWriter<StreamEvent> writer,
SubscribeRequest request)
// Unsubscribe
public Task UnsubscribeAsync(string scope, string subscriptionId)
// Broadcast methods
public async Task BroadcastEventAsync(
string service, string topic, string eventType,
byte[] data, string? messageId = null,
Dictionary<string, string>? metadata = null)
public async Task BroadcastStateChangeAsync(
string service, string key, string action)
public async Task BroadcastToTopicAsync(
string service, string topic, byte[] data, string? messageId = null)
}
Per-service Lifecycle Events
Each service publishes lifecycle events to its own topic. The topic name is defined in that service's Configuration/Topics.cs. The CloudEvents ce-type attribute (set in the publish metadata map) discriminates the specific event.
Connection Status Events
Broadcast when Dapr pub/sub subscriptions change state:
await BroadcastConnectionStatusAsync(
service: "workmanager",
topic: Topics.WorkerStatus, // "workmanager.workerstatus"
status: "connected" // or "disconnected", "failed"
error: null // error message if failed
);
Event Flow
DaprStreamingSubscriptionManagercallsStartSubscriptionAsync- On success: broadcasts
"connected"status -
On failure: broadcasts
"failed"status with error -
BroadcastReconnectionStatusAsyncsends lifecycle events to gRPC subscribers via the per-service topic
Event Flow Examples
Example 1: State Change Event via SaveState
sequenceDiagram
participant Client
participant State as StateService
participant SubMgr as EventSubscriptionManager
participant Subscribers
Client->>State: SaveState(service, key, value)
State->>SubMgr: BroadcastStateChangeAsync()
SubMgr->>SubMgr: Create StreamEvent
SubMgr->>SubMgr: GetMatchingScopes()
loop For each scope
SubMgr->>Subscribers: Write event to stream
end
Subscribers-->>Client: StreamEvent received
Example 2: Dapr Pub/Sub Event
sequenceDiagram
participant Publisher
participant Dapr as Dapr Sidecar
participant DaprMgr as DaprStreamingSubMgr
participant SubMgr as EventSubscriptionManager
participant Client
Publisher->>Dapr: publish(topic, data)
Dapr->>DaprMgr: TopicMessage
DaprMgr->>DaprMgr: Deserialize message
DaprMgr->>SubMgr: BroadcastToTopicAsync()
SubMgr->>Client: StreamEvent via gRPC
DaprMgr-->>Dapr: TopicResponseAction.Success
Example 3: System Connection Event
sequenceDiagram
participant Dapr as Dapr Sidecar
participant DaprMgr as DaprStreamingSubMgr
participant Client
Dapr->>DaprMgr: Subscription connected
DaprMgr->>Client: TopicEvent via Subscribe (topic: <service>.<category>, e.g. websocketmanager.connectionstatus)
Best Practices
Subscription Management
- Use CancellationTokens: Always pass cancellation tokens to prevent leaks
- Handle Disconnection: Implement reconnection logic
- Scope Filtering: Subscribe to specific scopes to reduce traffic
// Good: Specific subscription
var request = new SubscribeRequest
{
Services = { "workmanager" },
EventTypes = { "state" }
};
// Avoids: Global subscription (high traffic)
var request = new SubscribeRequest(); // Subscribes to everything
Error Handling
try
{
await foreach (var evt in stream.ResponseStream.ReadAllAsync(token))
{
ProcessEvent(evt);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
// Client disconnected - normal
}
catch (RpcException ex)
{
// Handle error, possibly reconnect
}