Services Documentation
Detailed documentation for each service component in the Virtufin API.
Table of Contents
- GatewayService
- GrpcReflectionService
- GrpcChannelPool
- PubsubService
- PubsubSubscriptionManager
- StateService
- ConfigService
- DaprPubSubBridgeService
GatewayService
File: Services/GatewayService.cs
The central gRPC service providing gateway functionality including service discovery, method invocation, and event streaming.
gRPC Service Definition
service Gateway {
rpc Invoke (InvokeRequest) returns (InvokeResponse);
rpc ListServices (ListServicesRequest) returns (ListServicesResponse);
rpc ListMethods (ListMethodsRequest) returns (ListMethodsResponse);
rpc Subscribe (SubscribeRequest) returns (stream StreamEvent);
}
Methods
Invoke
Invokes a method on a registered backend service.
public override async Task<InvokeResponse> Invoke(
InvokeRequest request,
ServerCallContext context)
Request:
- service - Target service name (e.g., "workmanager")
- method - Method name to invoke
- requestData - JSON payload for the request
Response:
- success - Boolean indicating success
- status - gRPC status code string
- message - Error message if failed
- responseData - JSON response payload
ListServices
Lists all services registered in the gateway configuration.
public override async Task<ListServicesResponse> ListServices(
ListServicesRequest request,
ServerCallContext context)
Response:
- services - List of service name strings
ListMethods
Lists all methods available on a specific service.
public override async Task<ListMethodsResponse> ListMethods(
ListMethodsRequest request,
ServerCallContext context)
Request:
- service - Service name to query
Response:
- methods - List of MethodInfo objects with name, input/output types, streaming flags
Special Handling:
For virtufin.Gateway or Gateway, returns the built-in gateway methods directly.
Subscribe
Subscribes to events matching the request criteria. Returns a streaming response.
public override async Task Subscribe(
SubscribeRequest request,
IServerStreamWriter<StreamEvent> responseStream,
ServerCallContext context)
Request:
- services - Filter by service names (empty = all)
- topics - Filter by topic names (empty = all)
- eventTypes - Filter by event types (empty = all)
Behavior:
- Creates a subscription with a unique ID
- Waits indefinitely (uses Task.Delay(Infinite))
- Cleans up subscription on cancellation
- Unsubscribes using scope built from request filters
Subscribe (per-service topics)
Note: The
WatchSystemEventsRPC was removed. All events (including per-service lifecycle events) are delivered through the regularSubscribeRPC. Subscribe to the appropriate per-service topic (e.g.websocketmanager.connectionstatus,workmanager.workerstatus) to receive those events.
Behavior: - Creates a subscription for the given topic - Broadcasts events on that topic - Cleans up on client disconnect
GrpcReflectionService
File: Services/GrpcReflectionService.cs
Provides dynamic gRPC service discovery and invocation using the gRPC Reflection protocol.
Key Classes
| Class | Purpose |
|---|---|
GrpcReflectionService |
Public API, manages per-service caches |
ServiceDescriptorCache |
Per-service descriptor caching and loading |
ServiceMethodInfo |
Method metadata storage |
MethodSchema |
Request/response schema |
FieldSchema |
Field definition |
GrpcCallResult |
Invocation result wrapper |
Public API
GetMethodsAsync
public async Task<List<ServiceMethodInfo>> GetMethodsAsync(
string serviceName,
CancellationToken cancellationToken = default)
Returns all methods for a service discovered via reflection.
GetMethodSchemaAsync
public async Task<MethodSchema?> GetMethodSchemaAsync(
string serviceName,
string methodName,
string? type = null,
CancellationToken cancellationToken = default)
Returns the schema for a method's input or output type.
Parameters:
- serviceName - Service to query
- methodName - Method name
- type - "input" or "output" (default: input)
ExecuteCallAsync
public async Task<GrpcCallResult> ExecuteCallAsync(
string serviceName,
string methodName,
string requestJson,
Dictionary<string, string>? metadata = null,
CancellationToken cancellationToken = default)
Executes a gRPC call by name using reflection data.
Returns: GrpcCallResult with:
- Success - Boolean
- StatusCode - gRPC status
- Message - Error detail
- Data - Response JSON
GetFileDescriptorProtosAsync
public async Task<List<ByteString>> GetFileDescriptorProtosAsync(
string serviceName,
CancellationToken cancellationToken = default)
Returns raw protocol buffer descriptors for the service.
Internal Processing
ServiceDescriptorCache
Manages the cached state for a single service:
- Lazy Loading: Descriptors loaded on first access
- Thread-Safe: Uses locking for initialization
- Descriptor Building: Parses FileDescriptorProtos and builds indexes
JSON↔Protobuf Conversion
The service handles conversion between JSON and protobuf:
Request (JSON → Protobuf):
1. Try JsonParser.Default.Parse() (preferred)
2. Fall back to field-by-field parsing
Response (Protobuf → JSON):
1. Try msgDesc.Parser.ParseFrom()
2. Fall back to field-by-field binary reading
Well-Known Descriptors
Includes descriptors for:
- google.protobuf.FileDescriptor (for proto schema)
- google.api.Http (for REST annotations)
- google.api.Annotations (for HTTP rules)
These are needed because reflection servers don't return transitive dependencies.
GrpcChannelPool
File: Services/GrpcChannelPool.cs
Thread-safe pool of gRPC channels organized by service name.
Public API
public GrpcChannel GetChannel(string serviceName)
Gets or creates a gRPC channel for the specified service.
Parameters:
- serviceName - Service name from configuration
Returns: GrpcChannel for the service
Throws: ArgumentException if service not found in configuration
Channel Management
- Lazy Creation: Channels created on first access
- Reuse: Same channel returned for repeated calls to same service
- Lifecycle: All channels disposed on
Dispose()
Configuration Integration
Channels are created using service configuration:
services:
- name: workmanager
protocol: grpc
daprAppId: workmanager
grpc:
host: localhost
port: 5002
pubsub:
pubsubName: pubsub
state:
storeName: statestore
jobs:
crons: []
Hard requirement: Every service in the configuration must implement gRPC reflection. When a gRPC channel is created for a service, the API Gateway probes for reflection support. If reflection is not available, an
InvalidOperationExceptionis thrown — the channel is not created and the service cannot be proxied. Ensure all backend services register gRPC reflection viaGrpc.AspNetCore.Server.Reflection.
Channel address: http://{host}:{port}
PubsubService
File: Services/PubSubService.cs
Provides pub/sub messaging operations using Dapr pub/sub. Implements the Pubsub gRPC service defined in Protos/pubsub.proto.
gRPC Service Definition
service Pubsub {
rpc PublishEvent (PublishRequest) returns (PublishResponse);
rpc Subscribe (PubsubSubscribeRequest) returns (stream PubsubSubscribeResponse);
rpc Unsubscribe (UnsubscribeRequest) returns (UnsubscribeResponse);
}
Methods
PublishEvent
Publishes an event to a Dapr topic.
public override async Task<PublishResponse> PublishEvent(
PublishRequest request,
ServerCallContext context)
Request:
- topic - The topic name to publish to
- data - The event data as bytes
- metadata - Optional key-value metadata
Response:
- success - Boolean indicating success
- status - Status string ("OK" or "ERROR")
- message - Success/error message
Subscribe
Subscribes to a topic and streams events as they arrive. Topics are created dynamically (no configuration needed).
public override async Task Subscribe(
PubsubSubscribeRequest request,
IServerStreamWriter<PubsubSubscribeResponse> responseStream,
ServerCallContext context)
Request:
- topic - The topic name to subscribe to
Response: Server-streaming of PubsubSubscribeResponse messages containing:
- topic - The topic name
- data - The event data as bytes
- metadata - Message metadata
- message_id - Unique message identifier
- timestamp - ISO 8601 timestamp
Unsubscribe
Stops receiving events for a subscription.
public override async Task<UnsubscribeResponse> Unsubscribe(
UnsubscribeRequest request,
ServerCallContext context)
Request:
- subscription_id - The subscription ID returned from Subscribe
Response:
- success - Boolean indicating success
- status - Status string
- message - Status message
PubsubSubscriptionManager
File: Services/PubSubService.cs (inner class PubsubSubscriptionManager)
Manages pub/sub subscriptions and broadcasts messages to subscribers. Inherits from SubscriptionManagerBase.
Public API
// Subscribe to a topic
string Subscribe(string topic, IServerStreamWriter<PubsubSubscribeResponse> writer, CancellationToken cancellationToken)
// Unsubscribe by ID
void UnsubscribeById(string subscriptionId)
// Broadcast to topic
Task BroadcastToTopicAsync(string topic, byte[] data, string? messageId = null, Dictionary<string, string>? metadata = null)
Key Features
- Inherits from
SubscriptionManagerBase<PubsubSubscribeResponse, PubsubSubscription> - Uses scope system: scopes are
pubsub/{topic} - 5-minute idle timeout per subscription
- Automatic cleanup on cancellation
StateService
File: Services/StateService.cs
Provides state management operations using Dapr state store. Implements the State gRPC service defined in Protos/state.proto.
gRPC Service Definition
service State {
rpc SaveState (SaveStateRequest) returns (SaveStateResponse);
rpc GetState (GetStateRequest) returns (GetStateResponse);
rpc GetAllState (GetAllStateRequest) returns (GetAllStateResponse);
rpc RegisterKeys (RegisterKeysRequest) returns (RegisterKeysResponse);
rpc DeleteState (DeleteStateRequest) returns (DeleteStateResponse);
}
message SaveStateRequest {
string service = 1;
string key = 2;
string value = 3;
string etag = 4;
bool include_value = 5;
}
message StatusResponse {
bool success = 1;
string status = 2;
string message = 3;
}
message SaveStateResponse {
StatusResponse status = 1;
}
message GetStateRequest {
string service = 1;
string key = 2;
}
message GetStateResponse {
string key = 1;
string value = 2;
string etag = 3;
}
message GetAllStateRequest {
string service = 1;
}
message GetAllStateResponse {
map<string, string> values = 1;
}
message RegisterKeysRequest {
string service = 1;
repeated string keys = 2;
}
message RegisterKeysResponse {
StatusResponse status = 1;
}
message DeleteStateRequest {
string service = 1;
string key = 2;
bool include_value = 3;
}
message DeleteStateResponse {
StatusResponse status = 1;
}
REST Endpoints
State operations are available at /v1/state/:
- POST /v1/state/save-state - Save state
- GET /v1/state/{service}/{key} - Get state for a key
- GET /v1/state/{service} - Get all registered state for a service
- POST /v1/state/register-keys - Register keys for tracking
- DELETE /v1/state/{service}/{key} - Delete state
ETag and Optimistic Concurrency
The state store supports optimistic concurrency control via ETags:
| Scenario | Behavior |
|---|---|
| No ETag provided | Last-write-wins. The write always succeeds. |
| Matching ETag | Write succeeds. State is updated. |
| Mismatched ETag | Write fails with StatusCode.Aborted. The caller should re-read the current state and retry. |
ETag conflict detection applies to both SaveState and DeleteState. The Dapr state store's native ConcurrencyMode.FirstWrite provides an additional atomicity layer.
// Safe update with ETag
var current = await client.GetStateAsync("my-service", "my-key");
await client.SaveStateAsync("my-service", "my-key", newValue, etag: current.Etag);
// Throws RpcException with StatusCode.Aborted if ETag doesn't match
State Change Broadcasting
When state changes via SaveState or DeleteState, the service publishes a state change event to Dapr pub/sub topic state-changes.
When include_value=true:
- Event payload includes the value for both save and delete operations
- Subscribers can access the value directly from the event rather than calling GetState
When include_value=false (default):
- Event payload contains only service, key, and action
- Subscribers should use GetState to retrieve the current value if needed
Dynamic Key Tracking
State keys are no longer statically configured in YAML. Instead, keys are tracked dynamically:
- Automatic tracking: Keys are registered automatically on first use via
SaveStateorGetState - Explicit registration: Use
RegisterKeysto pre-register keys before callingGetAllState
Note:
GetStateon a non-existent key will register that key in the tracking table. Over time, this can inflateGetAllStateresponses with empty keys. UseRegisterKeyson startup to register only the keys your service actually manages.
GetAllState
Returns all state for a service's registered keys.
public override async Task<GetAllStateResponse> GetAllState(
GetAllStateRequest request,
ServerCallContext context)
Request:
- service - Service name
Response:
- values - Map of key-value pairs for all registered keys
Note: Only returns state for keys that have been registered via RegisterKeys or previously used via SaveState/GetState.
RegisterKeys
Explicitly registers keys for a service, enabling GetAllState to return them.
public override async Task<RegisterKeysResponse> RegisterKeys(
RegisterKeysRequest request,
ServerCallContext context)
Request:
- service - Service name
- keys - List of key names to register
Response:
- success - Boolean indicating success
- status - Status string ("OK" or "ERROR")
- message - Status message
DaprPubSubBridgeService
File: Services/DaprPubSubBridgeService.cs
Bridges Dapr pub/sub subscriptions to gRPC streaming for event forwarding.
Public API
// Subscribe to a Dapr topic and forward events via gRPC stream
Task SubscribeToTopicAsync(
IServerStreamWriter<TopicEventRequest> clientStream,
string topic,
CancellationToken cancellationToken)
Key Features
- Uses
DaprPublishSubscribeClientfor topic subscription - 30-second message handling timeout with retry on failure
- Forwards events to gRPC clients via
TopicEventRequestprotobuf - Returns
TopicResponseAction.Successon successful handling