Skip to content

Services Documentation

Detailed documentation for each service component in the Virtufin API.

Table of Contents


GatewayService

File: Services/GatewayService.cs

The central gRPC service providing gateway functionality including service discovery, method invocation, and event streaming.

gRPC Service Definition

service Gateway {
  rpc Invoke (InvokeRequest) returns (InvokeResponse);
  rpc ListServices (ListServicesRequest) returns (ListServicesResponse);
  rpc ListMethods (ListMethodsRequest) returns (ListMethodsResponse);
  rpc Subscribe (SubscribeRequest) returns (stream StreamEvent);
}

Methods

Invoke

Invokes a method on a registered backend service.

public override async Task<InvokeResponse> Invoke(
    InvokeRequest request, 
    ServerCallContext context)

Request: - service - Target service name (e.g., "workmanager") - method - Method name to invoke - requestData - JSON payload for the request

Response: - success - Boolean indicating success - status - gRPC status code string - message - Error message if failed - responseData - JSON response payload

ListServices

Lists all services registered in the gateway configuration.

public override async Task<ListServicesResponse> ListServices(
    ListServicesRequest request, 
    ServerCallContext context)

Response: - services - List of service name strings

ListMethods

Lists all methods available on a specific service.

public override async Task<ListMethodsResponse> ListMethods(
    ListMethodsRequest request, 
    ServerCallContext context)

Request: - service - Service name to query

Response: - methods - List of MethodInfo objects with name, input/output types, streaming flags

Special Handling: For virtufin.Gateway or Gateway, returns the built-in gateway methods directly.

Subscribe

Subscribes to events matching the request criteria. Returns a streaming response.

public override async Task Subscribe(
    SubscribeRequest request,
    IServerStreamWriter<StreamEvent> responseStream,
    ServerCallContext context)

Request: - services - Filter by service names (empty = all) - topics - Filter by topic names (empty = all) - eventTypes - Filter by event types (empty = all)

Behavior: - Creates a subscription with a unique ID - Waits indefinitely (uses Task.Delay(Infinite)) - Cleans up subscription on cancellation - Unsubscribes using scope built from request filters

Subscribe (per-service topics)

Note: The WatchSystemEvents RPC was removed. All events (including per-service lifecycle events) are delivered through the regular Subscribe RPC. Subscribe to the appropriate per-service topic (e.g. websocketmanager.connectionstatus, workmanager.workerstatus) to receive those events.

Behavior: - Creates a subscription for the given topic - Broadcasts events on that topic - Cleans up on client disconnect


GrpcReflectionService

File: Services/GrpcReflectionService.cs

Provides dynamic gRPC service discovery and invocation using the gRPC Reflection protocol.

Key Classes

Class Purpose
GrpcReflectionService Public API, manages per-service caches
ServiceDescriptorCache Per-service descriptor caching and loading
ServiceMethodInfo Method metadata storage
MethodSchema Request/response schema
FieldSchema Field definition
GrpcCallResult Invocation result wrapper

Public API

GetMethodsAsync

public async Task<List<ServiceMethodInfo>> GetMethodsAsync(
    string serviceName, 
    CancellationToken cancellationToken = default)

Returns all methods for a service discovered via reflection.

GetMethodSchemaAsync

public async Task<MethodSchema?> GetMethodSchemaAsync(
    string serviceName, 
    string methodName, 
    string? type = null, 
    CancellationToken cancellationToken = default)

Returns the schema for a method's input or output type.

Parameters: - serviceName - Service to query - methodName - Method name - type - "input" or "output" (default: input)

ExecuteCallAsync

public async Task<GrpcCallResult> ExecuteCallAsync(
    string serviceName,
    string methodName,
    string requestJson,
    Dictionary<string, string>? metadata = null,
    CancellationToken cancellationToken = default)

Executes a gRPC call by name using reflection data.

Returns: GrpcCallResult with: - Success - Boolean - StatusCode - gRPC status - Message - Error detail - Data - Response JSON

GetFileDescriptorProtosAsync

public async Task<List<ByteString>> GetFileDescriptorProtosAsync(
    string serviceName, 
    CancellationToken cancellationToken = default)

Returns raw protocol buffer descriptors for the service.

Internal Processing

ServiceDescriptorCache

Manages the cached state for a single service:

  • Lazy Loading: Descriptors loaded on first access
  • Thread-Safe: Uses locking for initialization
  • Descriptor Building: Parses FileDescriptorProtos and builds indexes

JSON↔Protobuf Conversion

The service handles conversion between JSON and protobuf:

Request (JSON → Protobuf): 1. Try JsonParser.Default.Parse() (preferred) 2. Fall back to field-by-field parsing

Response (Protobuf → JSON): 1. Try msgDesc.Parser.ParseFrom() 2. Fall back to field-by-field binary reading

Well-Known Descriptors

Includes descriptors for: - google.protobuf.FileDescriptor (for proto schema) - google.api.Http (for REST annotations) - google.api.Annotations (for HTTP rules)

These are needed because reflection servers don't return transitive dependencies.


GrpcChannelPool

File: Services/GrpcChannelPool.cs

Thread-safe pool of gRPC channels organized by service name.

Public API

public GrpcChannel GetChannel(string serviceName)

Gets or creates a gRPC channel for the specified service.

Parameters: - serviceName - Service name from configuration

Returns: GrpcChannel for the service

Throws: ArgumentException if service not found in configuration

Channel Management

  • Lazy Creation: Channels created on first access
  • Reuse: Same channel returned for repeated calls to same service
  • Lifecycle: All channels disposed on Dispose()

Configuration Integration

Channels are created using service configuration:

services:
  - name: workmanager
    protocol: grpc
    daprAppId: workmanager
    grpc:
      host: localhost
      port: 5002
    pubsub:
      pubsubName: pubsub
    state:
      storeName: statestore
    jobs:
      crons: []

Hard requirement: Every service in the configuration must implement gRPC reflection. When a gRPC channel is created for a service, the API Gateway probes for reflection support. If reflection is not available, an InvalidOperationException is thrown — the channel is not created and the service cannot be proxied. Ensure all backend services register gRPC reflection via Grpc.AspNetCore.Server.Reflection.

Channel address: http://{host}:{port}


PubsubService

File: Services/PubSubService.cs

Provides pub/sub messaging operations using Dapr pub/sub. Implements the Pubsub gRPC service defined in Protos/pubsub.proto.

gRPC Service Definition

service Pubsub {
  rpc PublishEvent (PublishRequest) returns (PublishResponse);
  rpc Subscribe (PubsubSubscribeRequest) returns (stream PubsubSubscribeResponse);
  rpc Unsubscribe (UnsubscribeRequest) returns (UnsubscribeResponse);
}

Methods

PublishEvent

Publishes an event to a Dapr topic.

public override async Task<PublishResponse> PublishEvent(
    PublishRequest request,
    ServerCallContext context)

Request: - topic - The topic name to publish to - data - The event data as bytes - metadata - Optional key-value metadata

Response: - success - Boolean indicating success - status - Status string ("OK" or "ERROR") - message - Success/error message

Subscribe

Subscribes to a topic and streams events as they arrive. Topics are created dynamically (no configuration needed).

public override async Task Subscribe(
    PubsubSubscribeRequest request,
    IServerStreamWriter<PubsubSubscribeResponse> responseStream,
    ServerCallContext context)

Request: - topic - The topic name to subscribe to

Response: Server-streaming of PubsubSubscribeResponse messages containing: - topic - The topic name - data - The event data as bytes - metadata - Message metadata - message_id - Unique message identifier - timestamp - ISO 8601 timestamp

Unsubscribe

Stops receiving events for a subscription.

public override async Task<UnsubscribeResponse> Unsubscribe(
    UnsubscribeRequest request,
    ServerCallContext context)

Request: - subscription_id - The subscription ID returned from Subscribe

Response: - success - Boolean indicating success - status - Status string - message - Status message


PubsubSubscriptionManager

File: Services/PubSubService.cs (inner class PubsubSubscriptionManager)

Manages pub/sub subscriptions and broadcasts messages to subscribers. Inherits from SubscriptionManagerBase.

Public API

// Subscribe to a topic
string Subscribe(string topic, IServerStreamWriter<PubsubSubscribeResponse> writer, CancellationToken cancellationToken)

// Unsubscribe by ID
void UnsubscribeById(string subscriptionId)

// Broadcast to topic
Task BroadcastToTopicAsync(string topic, byte[] data, string? messageId = null, Dictionary<string, string>? metadata = null)

Key Features

  • Inherits from SubscriptionManagerBase<PubsubSubscribeResponse, PubsubSubscription>
  • Uses scope system: scopes are pubsub/{topic}
  • 5-minute idle timeout per subscription
  • Automatic cleanup on cancellation

StateService

File: Services/StateService.cs

Provides state management operations using Dapr state store. Implements the State gRPC service defined in Protos/state.proto.

gRPC Service Definition

service State {
  rpc SaveState (SaveStateRequest) returns (SaveStateResponse);
  rpc GetState (GetStateRequest) returns (GetStateResponse);
  rpc GetAllState (GetAllStateRequest) returns (GetAllStateResponse);
  rpc RegisterKeys (RegisterKeysRequest) returns (RegisterKeysResponse);
  rpc DeleteState (DeleteStateRequest) returns (DeleteStateResponse);
}

message SaveStateRequest {
  string service = 1;
  string key = 2;
  string value = 3;
  string etag = 4;
  bool include_value = 5;
}

message StatusResponse {
  bool success = 1;
  string status = 2;
  string message = 3;
}

message SaveStateResponse {
  StatusResponse status = 1;
}

message GetStateRequest {
  string service = 1;
  string key = 2;
}

message GetStateResponse {
  string key = 1;
  string value = 2;
  string etag = 3;
}

message GetAllStateRequest {
  string service = 1;
}

message GetAllStateResponse {
  map<string, string> values = 1;
}

message RegisterKeysRequest {
  string service = 1;
  repeated string keys = 2;
}

message RegisterKeysResponse {
  StatusResponse status = 1;
}

message DeleteStateRequest {
  string service = 1;
  string key = 2;
  bool include_value = 3;
}

message DeleteStateResponse {
  StatusResponse status = 1;
}

REST Endpoints

State operations are available at /v1/state/: - POST /v1/state/save-state - Save state - GET /v1/state/{service}/{key} - Get state for a key - GET /v1/state/{service} - Get all registered state for a service - POST /v1/state/register-keys - Register keys for tracking - DELETE /v1/state/{service}/{key} - Delete state

ETag and Optimistic Concurrency

The state store supports optimistic concurrency control via ETags:

Scenario Behavior
No ETag provided Last-write-wins. The write always succeeds.
Matching ETag Write succeeds. State is updated.
Mismatched ETag Write fails with StatusCode.Aborted. The caller should re-read the current state and retry.

ETag conflict detection applies to both SaveState and DeleteState. The Dapr state store's native ConcurrencyMode.FirstWrite provides an additional atomicity layer.

// Safe update with ETag
var current = await client.GetStateAsync("my-service", "my-key");
await client.SaveStateAsync("my-service", "my-key", newValue, etag: current.Etag);
// Throws RpcException with StatusCode.Aborted if ETag doesn't match

State Change Broadcasting

When state changes via SaveState or DeleteState, the service publishes a state change event to Dapr pub/sub topic state-changes.

When include_value=true: - Event payload includes the value for both save and delete operations - Subscribers can access the value directly from the event rather than calling GetState

When include_value=false (default): - Event payload contains only service, key, and action - Subscribers should use GetState to retrieve the current value if needed

Dynamic Key Tracking

State keys are no longer statically configured in YAML. Instead, keys are tracked dynamically:

  • Automatic tracking: Keys are registered automatically on first use via SaveState or GetState
  • Explicit registration: Use RegisterKeys to pre-register keys before calling GetAllState

Note: GetState on a non-existent key will register that key in the tracking table. Over time, this can inflate GetAllState responses with empty keys. Use RegisterKeys on startup to register only the keys your service actually manages.

GetAllState

Returns all state for a service's registered keys.

public override async Task<GetAllStateResponse> GetAllState(
    GetAllStateRequest request,
    ServerCallContext context)

Request: - service - Service name

Response: - values - Map of key-value pairs for all registered keys

Note: Only returns state for keys that have been registered via RegisterKeys or previously used via SaveState/GetState.

RegisterKeys

Explicitly registers keys for a service, enabling GetAllState to return them.

public override async Task<RegisterKeysResponse> RegisterKeys(
    RegisterKeysRequest request,
    ServerCallContext context)

Request: - service - Service name - keys - List of key names to register

Response: - success - Boolean indicating success - status - Status string ("OK" or "ERROR") - message - Status message


DaprPubSubBridgeService

File: Services/DaprPubSubBridgeService.cs

Bridges Dapr pub/sub subscriptions to gRPC streaming for event forwarding.

Public API

// Subscribe to a Dapr topic and forward events via gRPC stream
Task SubscribeToTopicAsync(
    IServerStreamWriter<TopicEventRequest> clientStream,
    string topic,
    CancellationToken cancellationToken)

Key Features

  • Uses DaprPublishSubscribeClient for topic subscription
  • 30-second message handling timeout with retry on failure
  • Forwards events to gRPC clients via TopicEventRequest protobuf
  • Returns TopicResponseAction.Success on successful handling