Skip to content

Python Client Library

Overview

A Python client library for interacting with the Virtufin.Api Gateway service via gRPC.

Installation

From Gitea PyPI Registry

Set up authentication with a Gitea personal access token (scope: read:packages):

export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>

pip install virtufin-api \
  --index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
  --extra-index-url https://pypi.org/simple

From Local Source

export PYTHONPATH=/path/to/virtufin-api/src/python

Or install the package. The client requires the generated protobuf stubs in the same directory:

import sys
sys.path.insert(0, '/path/to/virtufin-api/src/python')

from virtufin.api.client import ApiClient

Quick Start

Async Client (Required)

import asyncio
from virtufin.api.client import ApiClient

async def main():
    async with ApiClient(host="localhost", grpc_port=5002) as client:
        services = await client.list_services_async()
        print(f"Services: {services}")

asyncio.run(main())

Note: This library is async-only. All methods return coroutines and must be awaited.


ApiClient

Async-only client using grpc.aio channels.

Constructor

client = ApiClient(host="localhost", grpc_port=5002)
Parameter Type Default Description
host str "localhost" gRPC server hostname
grpc_port int 5002 Native gRPC server port

Async Context Manager

async with ApiClient() as client:
    services = await client.list_services_async()
# Channel automatically closed on exit

Properties

client.gateway    # GatewayClient for service invocation
client.pubsub    # PubsubClient for publish/subscribe

Methods (all async)

await client.list_services_async()
await client.list_methods_async(service)
await client.invoke_async(service, method, request_data)
await client.save_state_async(service, key, value, etag=None)
await client.get_state_async(service, key)
await client.get_all_state_async(service)
await client.register_keys_async(service, keys)
await client.subscribe_async_with_handler(handler, services=None, topics=None, event_types=None)
await client.close()

Example: List Services

async with ApiClient() as client:
    services = await client.list_services_async()
    print(services)

Example: State Management

async with ApiClient() as client:
    # Save state
    result = await client.save_state_async("workmanager", "mykey", "myvalue")
    print(f"Saved: {result['success']}")

    # Get state
    state = await client.get_state_async("workmanager", "mykey")
    print(f"Value: {state['value']}, Etag: {state['etag']}")

    # Register keys and get all state
    await client.register_keys_async("workmanager", ["key1", "key2"])
    all_state = await client.get_all_state_async("workmanager")
    print(f"All state: {all_state['values']}")

Example: Subscribe to Events

from virtufin.api.client import ApiClient, StreamEventHandler

async def main():
    async with ApiClient() as client:
        events = []

        class Handler:
            def on_event(self, event):
                events.append(event)
                print(f"Event: {event.service}/{event.topic}")

            def on_error(self, error):
                print(f"Error: {error}")

        handler = Handler()
        await client.subscribe_async_with_handler(handler)

        # Keep running...
        await asyncio.sleep(10)

asyncio.run(main())

PubsubClient

High-level pub/sub operations via client.pubsub. All methods are async.

Methods

# Publish async — returns {"success": bool, "status": str, "message": str}
await client.pubsub.publish_async(topic, data, metadata=None)

# Subscribe - returns async generator yielding PubsubEvent
async for event in client.pubsub.subscribe_async_iter(topic):
    print(f"Received: {event.topic} - {event.data_as_text()}")

# Subscribe with handler (async)
async def on_event(event):
    print(f"Event: {event.topic}")
await client.pubsub.subscribe_async_with_handler(on_event, topics=["my-topic"])

# Request-reply: publish and await correlated response
result = await client.pubsub.publish_with_result_async(
    topic="requests",
    data={"input": "hello"},
    reply_topic="responses",
    timeout=10.0,
)
print(f"Response: {result.data_as_json()}")

# Unsubscribe (note: subscription_id is not currently returned by Subscribe;
# cancel the async iteration instead to stop a subscription)
# await client.pubsub.unsubscribe_async(subscription_id)

Parameters for publish_with_result_async: - topic — Topic to publish the request to - data — Request data (str, bytes, or JSON-serializable object) - reply_topic — Topic to listen for the correlated response on - timeout — Maximum wait time in seconds (default: 30.0) - metadata — Optional dict of additional metadata - correlation_id — Optional correlation ID (auto-generated UUID if omitted)

Returns: The matching PubsubEvent with a correlationid matching the request. Raises: TimeoutError if no response arrives within the timeout.

### PubsubEvent

```python
event.topic       # Topic name
event.data        # Raw bytes
event.data_as_text()    # Decode as UTF-8 string
event.data_as_json()    # Decode as JSON object
event.metadata    # Dict of metadata
event.message_id  # Unique message ID
event.timestamp   # ISO timestamp

Example: Pub/Sub

async with ApiClient() as client:
    topic = "my-topic"

    # Subscribe
    async def collect():
        async for event in client.pubsub.subscribe_async_iter(topic):
            print(f"Received: {event.data_as_text()}")

    sub_task = asyncio.create_task(collect())

    # Publish events
    for i in range(5):
        await client.pubsub.publish_async(topic, f"Message {i}")
        await asyncio.sleep(1)

    sub_task.cancel()
    try:
        await sub_task
    except asyncio.CancelledError:
        pass

Example: Request-Reply

async with ApiClient() as client:
    request_topic = "worker-commands"
    reply_topic = "worker-responses"

    # Publish a command and wait for the correlated response
    response = await client.pubsub.publish_with_result_async(
        topic=request_topic,
        data={"command": "status"},
        reply_topic=reply_topic,
        timeout=10.0,
    )
    print(f"Got response: {response.data_as_json()}")

GatewayClient

Low-level gateway operations via client.gateway. All methods are async.

Dynamic Service Access

The client.gateway.<service> accessor returns a proxy that resolves method names dynamically to Gateway.InvokeJson calls. Any RPC on any backend service (workmanager, websocketmanager, custom services) can be invoked this way without importing the backend's proto or generated stub.

async with ApiClient() as client:
    services = await client.list_services_async()
    print(f"Available services: {services}")

Dynamic Dispatch Examples

WorkManager

async with ApiClient() as client:
    workmanager = client.gateway.workmanager

    # ListWorkers — no request fields; returns {"workers": [...]}
    workers = await workmanager.ListWorkers()
    for w in workers["workers"]:
        print(f"  {w['id']}: {w['status']} ({w['language']})")

    # CreateWorker — CodeSource oneof (url | content), mime_type, topic, group
    create_result = await workmanager.CreateWorker({
        "code_source": {"url": "https://example.com/worker.py"},
        "mime_type": "text/x-python",
        "topic": "worker-commands",
    })
    worker_id = create_result["id"]

    # StartWorker — id field
    await workmanager.StartWorker({"id": worker_id})

WebSocketManager

async with ApiClient() as client:
    wsm = client.gateway.websocketmanager

    # List — no request fields; returns {"connections": [...]}
    connections = await wsm.List()
    for c in connections["connections"]:
        print(f"  {c['id']}: {c['url']} ({c['status']})")

    # Connect — url, auto_reconnect
    conn = await wsm.Connect({
        "url": "wss://stream.example.com/feed",
        "auto_reconnect": True,
    })
    connection_id = conn["id"]

    # Disconnect — id field
    await wsm.Disconnect({"id": connection_id})

Note: Service clients are async — they return coroutines that must be awaited. The Gateway.InvokeJson RPC serializes the request dict to JSON, the backend service unmarshals it to the typed proto request, runs the RPC, and marshals the typed proto response back to JSON. See the proto-to-client-mapping spec §Layer 4 (Dynamic dispatch) for the full call chain.


StreamEventHandler

Callback handler for streaming subscriptions.

from virtufin.api.client import StreamEventHandler

handler = StreamEventHandler(
    on_event=lambda e: print(f"Event: {e.service}/{e.topic}"),
    on_error=lambda ex: print(f"Error: {ex}")
)

DaprPublisher

Publish events via Dapr sidecar (bypasses Virtufin API, uses Dapr pub/sub directly). The publish_async method is async; use it inside a coroutine with await.

from virtufin.api.client import DaprPublisher

async def main():
    publisher = DaprPublisher(pubsub_name="pubsub")
    await publisher.publish_async("my-topic", {"key": "value"})

Breaking change in 0.0.40: DaprPublisher.publish() was renamed to DaprPublisher.publish_async() and is now async. The previous sync version called the sync dapr.clients.grpc.client.DaprGrpcClient; the new async version uses dapr.aio.clients.grpc.client.DaprGrpcClientAsync. Callers must await the call.


Error Handling

import grpc

async with ApiClient() as client:
    try:
        result = await client.invoke_async("unknown-service", "UnknownMethod", {})
    except grpc.RpcError as e:
        print(f"gRPC error {e.code()}: {e.details()}")
    except ValueError as e:
        print(f"Gateway error: {e}")

Dependencies

pip install grpcio grpcio-reflection protobuf

Generated protobuf stubs must be in the same directory: - gateway_pb2.py - gateway_pb2_grpc.py - state_pb2.py - state_pb2_grpc.py - pubsub_pb2.py - pubsub_pb2_grpc.py

Generate with:

python scripts/create_client.py --output src/python


Thread Safety

The client is not thread-safe. Use separate instances for concurrent access:

# Not recommended - share across threads
client = ApiClient()

# Recommended - separate instance per thread/task
async def handle_request():
    async with ApiClient() as client:
        return await client.list_services_async()

SDK Reference