Files
OpenLLM/typings/docker/utils/socket.pyi
Aaron Pham c7f4dc7bb2 feat(test): snapshot testing (#107)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2023-07-10 17:23:19 -04:00

68 lines
2.1 KiB
Python
Generated

"""This type stub file was generated by pyright."""
STDOUT = ...
STDERR = ...
class SocketError(Exception): ...
NPIPE_ENDED = ...
def read(socket, n=...):
"""Reads at most n bytes from socket."""
...
def read_exactly(socket, n): # -> bytes:
"""Reads exactly n bytes from socket
Raises SocketError if there isn't enough data.
"""
...
def next_frame_header(socket): # -> tuple[Literal[-1], Literal[-1]] | tuple[Any, Any]:
"""Returns the stream and size of the next frame of data waiting to be read
from socket, according to the protocol defined here:
https://docs.docker.com/engine/api/v1.24/#attach-to-a-container
"""
...
def frames_iter(
socket, tty
): # -> Generator[tuple[Literal[1], Unknown], None, None] | Generator[tuple[Any | Literal[-1], Unknown], Any, None]:
"""Return a generator of frames read from socket. A frame is a tuple where
the first item is the stream number and the second item is a chunk of data.
If the tty setting is enabled, the streams are multiplexed into the stdout
stream.
"""
...
def frames_iter_no_tty(socket): # -> Generator[tuple[Any | Literal[-1], Unknown], Any, None]:
"""Returns a generator of data read from the socket when the tty setting is
not enabled.
"""
...
def frames_iter_tty(socket): # -> Generator[Unknown, Any, None]:
"""Return a generator of data read from the socket when the tty setting is
enabled.
"""
...
def consume_socket_output(frames, demux=...): # -> bytes | tuple[None, ...]:
"""Iterate through frames read from the socket and return the result.
Args:
demux (bool):
If False, stdout and stderr are multiplexed, and the result is the
concatenation of all the frames. If True, the streams are
demultiplexed, and the result is a 2-tuple where each item is the
concatenation of frames belonging to the same stream.
"""
...
def demux_adaptor(stream_id, data): # -> tuple[Unknown, None] | tuple[None, Unknown]:
"""Utility to demultiplex stdout and stderr when reading frames from the
socket.
"""
...