gen_ai_hub.orchestration.sse_client
index
/home/jenkins/agent/workspace/ation_generative-ai-hub-sdk_main/gen_ai_hub/orchestration/sse_client.py

Module for Server-Sent Events (SSE) clients for orchestration responses.
 
This module provides both synchronous and asynchronous SSE clients for iterating over streaming responses.
Each client is responsible for handling HTTP errors and for closing the underlying HTTP stream
when iteration is complete.

 
Modules
       
dacite
httpx
json

 
Classes
       
builtins.object
AsyncSSEClient
SSEClient

 
class AsyncSSEClient(builtins.object)
    AsyncSSEClient(client: httpx.AsyncClient, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
 
An asynchronous SSE client for iterating over streaming responses.
 
This client wraps an asynchronous HTTP stream (provided as a context manager) and ensures
that the stream is properly opened and closed. It also checks for HTTP errors upon entering the stream.
 
  Methods defined here:
async __aenter__(self)
Asynchronously enters the context for the streaming response.
 
It awaits the response, checks for HTTP errors, and if an error occurs,
reads the content and raises an OrchestrationError.
 
Returns:
    Self, with the streaming response stored.
async __aexit__(self, exc_type, exc_val, exc_tb)
Asynchronously exits the context, ensuring that both the HTTP stream and the client are properly closed.
__aiter__(self)
Returns the async iterator (self). The initialization of the stream is deferred until the first
call to __anext__.
async __anext__(self)
Asynchronously retrieves the next event from the stream. On the first call, it enters the asynchronous
context to start the stream. When the stream is exhausted or the final message is received, it properly
exits the context.
 
Returns:
    The next parsed event from the stream.
 
Raises:
    StopAsyncIteration: When the stream is exhausted.
__init__(self, client: httpx.AsyncClient, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
Initializes the AsyncSSEClient.
 
Args:
    client: The httpx.AsyncClient instance that created the stream.
    response_cm: An asynchronous context manager for the HTTP streaming response.
    prefix: The SSE data prefix (default "data: ").
    final_message: The message indicating the end of the stream (default "[DONE]").

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
class SSEClient(builtins.object)
    SSEClient(client: httpx.Client, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
 
A synchronous Server-Sent Events (SSE) client that wraps an httpx.Response for iterating
over streaming responses.
 
This client reads data chunks from the HTTP stream, parses each SSE event, and closes the
underlying HTTP stream once iteration is complete.
 
  Methods defined here:
__enter__(self)
Synchronously enters the context for the streaming response.
 
It awaits the response, checks for HTTP errors, and if an error occurs,
reads the content and raises an OrchestrationError.
 
Returns:
    Self, with the streaming response stored.
__exit__(self, exc_type, exc_val, exc_tb)
Synchronously exits the context, ensuring that both the HTTP stream and the client are properly closed.
__init__(self, client: httpx.Client, response_cm, prefix: str = 'data: ', final_message: str = '[DONE]')
Initializes the SSEClient.
 
Args:
    prefix: The prefix string that identifies SSE event data (default "data: ").
    final_message: The message that indicates the end of the stream (default "[DONE]").
__iter__(self) -> Iterator
Returns self as an iterator. Opens the HTTP stream and initializes the internal iterator.
__next__(self)
Retrieves the next parsed SSE event from the stream.
It skips any lines that do not start with the expected prefix. When the final message is encountered
or the stream is exhausted, it closes the stream and raises StopIteration.
iter_lines(self) -> Iterable[str]
Reads data chunks from the HTTP stream and yields complete lines.
 
This method accumulates incoming chunks until a newline is encountered, yielding one complete
line at a time.
 
Yields:
    Complete lines of text from the streaming response.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

 
Data
        AsyncIterator = typing.AsyncIterator
Iterable = typing.Iterable
Iterator = typing.Iterator