Python Client Library
Overview
A Python client library for interacting with the Virtufin.Api Gateway service via gRPC.
Installation
From Gitea PyPI Registry
Set up authentication with a Gitea personal access token (scope: read:packages):
export PIP_USERNAME=<your-gitea-username>
export PIP_PASSWORD=<your-gitea-token>
pip install virtufin-api \
--index-url https://pypi.haenerconsulting.com/api/packages/virtufin/pypi/simple/ \
--extra-index-url https://pypi.org/simple
From Local Source
export PYTHONPATH=/path/to/virtufin-api/src/python
Or install the package. The client requires the generated protobuf stubs in the same directory:
import sys
sys.path.insert(0, '/path/to/virtufin-api/src/python')
from virtufin.api.client import ApiClient
Quick Start
Async Client (Required)
import asyncio
from virtufin.api.client import ApiClient
async def main():
async with ApiClient(host="localhost", grpc_port=5002) as client:
services = await client.list_services_async()
print(f"Services: {services}")
asyncio.run(main())
Note: This library is async-only. All methods return coroutines and must be awaited.
ApiClient
Async-only client using grpc.aio channels.
Constructor
client = ApiClient(host="localhost", grpc_port=5002)
| Parameter | Type | Default | Description |
|---|---|---|---|
| host | str | "localhost" | gRPC server hostname |
| grpc_port | int | 5002 | Native gRPC server port |
Async Context Manager
async with ApiClient() as client:
services = await client.list_services_async()
# Channel automatically closed on exit
Properties
client.gateway # GatewayClient for service invocation
client.pubsub # PubsubClient for publish/subscribe
Methods (all async)
await client.list_services_async()
await client.list_methods_async(service)
await client.invoke_async(service, method, request_data)
await client.save_state_async(service, key, value, etag=None)
await client.get_state_async(service, key)
await client.get_all_state_async(service)
await client.register_keys_async(service, keys)
await client.subscribe_async_with_handler(handler, services=None, topics=None, event_types=None)
await client.close()
Example: List Services
async with ApiClient() as client:
services = await client.list_services_async()
print(services)
Example: State Management
async with ApiClient() as client:
# Save state
result = await client.save_state_async("workmanager", "mykey", "myvalue")
print(f"Saved: {result['success']}")
# Get state
state = await client.get_state_async("workmanager", "mykey")
print(f"Value: {state['value']}, Etag: {state['etag']}")
# Register keys and get all state
await client.register_keys_async("workmanager", ["key1", "key2"])
all_state = await client.get_all_state_async("workmanager")
print(f"All state: {all_state['values']}")
Example: Subscribe to Events
from virtufin.api.client import ApiClient, StreamEventHandler
async def main():
async with ApiClient() as client:
events = []
class Handler:
def on_event(self, event):
events.append(event)
print(f"Event: {event.service}/{event.topic}")
def on_error(self, error):
print(f"Error: {error}")
handler = Handler()
await client.subscribe_async_with_handler(handler)
# Keep running...
await asyncio.sleep(10)
asyncio.run(main())
PubsubClient
High-level pub/sub operations via client.pubsub. All methods are async.
Methods
# Publish async — returns {"success": bool, "status": str, "message": str}
await client.pubsub.publish_async(topic, data, metadata=None)
# Subscribe - returns async generator yielding PubsubEvent
async for event in client.pubsub.subscribe_async_iter(topic):
print(f"Received: {event.topic} - {event.data_as_text()}")
# Subscribe with handler (async)
async def on_event(event):
print(f"Event: {event.topic}")
await client.pubsub.subscribe_async_with_handler(on_event, topics=["my-topic"])
# Request-reply: publish and await correlated response
result = await client.pubsub.publish_with_result_async(
topic="requests",
data={"input": "hello"},
reply_topic="responses",
timeout=10.0,
)
print(f"Response: {result.data_as_json()}")
# Unsubscribe (note: subscription_id is not currently returned by Subscribe;
# cancel the async iteration instead to stop a subscription)
# await client.pubsub.unsubscribe_async(subscription_id)
Parameters for publish_with_result_async:
- topic — Topic to publish the request to
- data — Request data (str, bytes, or JSON-serializable object)
- reply_topic — Topic to listen for the correlated response on
- timeout — Maximum wait time in seconds (default: 30.0)
- metadata — Optional dict of additional metadata
- correlation_id — Optional correlation ID (auto-generated UUID if omitted)
Returns: The matching PubsubEvent with a correlationid matching the request.
Raises: TimeoutError if no response arrives within the timeout.
### PubsubEvent
```python
event.topic # Topic name
event.data # Raw bytes
event.data_as_text() # Decode as UTF-8 string
event.data_as_json() # Decode as JSON object
event.metadata # Dict of metadata
event.message_id # Unique message ID
event.timestamp # ISO timestamp
Example: Pub/Sub
async with ApiClient() as client:
topic = "my-topic"
# Subscribe
async def collect():
async for event in client.pubsub.subscribe_async_iter(topic):
print(f"Received: {event.data_as_text()}")
sub_task = asyncio.create_task(collect())
# Publish events
for i in range(5):
await client.pubsub.publish_async(topic, f"Message {i}")
await asyncio.sleep(1)
sub_task.cancel()
try:
await sub_task
except asyncio.CancelledError:
pass
Example: Request-Reply
async with ApiClient() as client:
request_topic = "worker-commands"
reply_topic = "worker-responses"
# Publish a command and wait for the correlated response
response = await client.pubsub.publish_with_result_async(
topic=request_topic,
data={"command": "status"},
reply_topic=reply_topic,
timeout=10.0,
)
print(f"Got response: {response.data_as_json()}")
GatewayClient
Low-level gateway operations via client.gateway. All methods are async.
Dynamic Service Access
The client.gateway.<service> accessor returns a proxy that resolves method
names dynamically to Gateway.InvokeJson calls. Any RPC on any backend
service (workmanager, websocketmanager, custom services) can be invoked
this way without importing the backend's proto or generated stub.
async with ApiClient() as client:
services = await client.list_services_async()
print(f"Available services: {services}")
Dynamic Dispatch Examples
WorkManager
async with ApiClient() as client:
workmanager = client.gateway.workmanager
# ListWorkers — no request fields; returns {"workers": [...]}
workers = await workmanager.ListWorkers()
for w in workers["workers"]:
print(f" {w['id']}: {w['status']} ({w['language']})")
# CreateWorker — CodeSource oneof (url | content), mime_type, topic, group
create_result = await workmanager.CreateWorker({
"code_source": {"url": "https://example.com/worker.py"},
"mime_type": "text/x-python",
"topic": "worker-commands",
})
worker_id = create_result["id"]
# StartWorker — id field
await workmanager.StartWorker({"id": worker_id})
WebSocketManager
async with ApiClient() as client:
wsm = client.gateway.websocketmanager
# List — no request fields; returns {"connections": [...]}
connections = await wsm.List()
for c in connections["connections"]:
print(f" {c['id']}: {c['url']} ({c['status']})")
# Connect — url, auto_reconnect
conn = await wsm.Connect({
"url": "wss://stream.example.com/feed",
"auto_reconnect": True,
})
connection_id = conn["id"]
# Disconnect — id field
await wsm.Disconnect({"id": connection_id})
Note: Service clients are async — they return coroutines that must be awaited.
The Gateway.InvokeJson RPC serializes the request dict to JSON, the backend
service unmarshals it to the typed proto request, runs the RPC, and marshals
the typed proto response back to JSON. See the
proto-to-client-mapping spec
§Layer 4 (Dynamic dispatch) for the full call chain.
StreamEventHandler
Callback handler for streaming subscriptions.
from virtufin.api.client import StreamEventHandler
handler = StreamEventHandler(
on_event=lambda e: print(f"Event: {e.service}/{e.topic}"),
on_error=lambda ex: print(f"Error: {ex}")
)
DaprPublisher
Publish events via Dapr sidecar (bypasses Virtufin API, uses Dapr pub/sub directly).
The publish_async method is async; use it inside a coroutine with await.
from virtufin.api.client import DaprPublisher
async def main():
publisher = DaprPublisher(pubsub_name="pubsub")
await publisher.publish_async("my-topic", {"key": "value"})
Breaking change in 0.0.40: DaprPublisher.publish() was renamed to
DaprPublisher.publish_async() and is now async. The previous sync version
called the sync dapr.clients.grpc.client.DaprGrpcClient; the new async
version uses dapr.aio.clients.grpc.client.DaprGrpcClientAsync. Callers
must await the call.
Error Handling
import grpc
async with ApiClient() as client:
try:
result = await client.invoke_async("unknown-service", "UnknownMethod", {})
except grpc.RpcError as e:
print(f"gRPC error {e.code()}: {e.details()}")
except ValueError as e:
print(f"Gateway error: {e}")
Dependencies
pip install grpcio grpcio-reflection protobuf
Generated protobuf stubs must be in the same directory:
- gateway_pb2.py
- gateway_pb2_grpc.py
- state_pb2.py
- state_pb2_grpc.py
- pubsub_pb2.py
- pubsub_pb2_grpc.py
Generate with:
python scripts/create_client.py --output src/python
Thread Safety
The client is not thread-safe. Use separate instances for concurrent access:
# Not recommended - share across threads
client = ApiClient()
# Recommended - separate instance per thread/task
async def handle_request():
async with ApiClient() as client:
return await client.list_services_async()