iotoolz.streams.Streams¶
iotoolz.streams.Streams
¶
Helper class to open streams with the corresponding AbcStream implementation based on the uri schema.
INMEM_SIZE: int
¶
Methods¶
__init__(self, *stream_types)
special
¶
Creates a new instance of Streams.
exists(self, uri)
¶
Whether a stream points to an existing resource.
glob(self, uri)
¶
Yield streams that matches the provided scheme and pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
Union[pathlib.Path, str] |
base uri to glob. |
required |
pattern |
str |
unix shell style pattern. Defaults to "*". |
required |
Returns:
Type | Description |
---|---|
Iterator[iotoolz._abc.AbcStream] |
Iterator["AbcStream"]: iterator of streams that matches the pattern. |
Yields
AbcStream: stream object
iter_dir(self, uri)
¶
If a directory, yields stream in the directory. Otherwise, yield all streams in the same directory as the provided uri.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
Union[pathlib.Path, str] |
uri to list. |
required |
Returns:
Type | Description |
---|---|
Iterator[iotoolz._abc.AbcStream] |
Iterator["AbcStream"]: iterator of streams that matches the pattern. |
Yields
AbcStream: stream object
mkdir(self, uri, mode=511, parents=False, exist_ok=False)
¶
Create a new directory at this given path. If mode is given, it is combined with the process’ umask value to determine the file mode and access flags. If the path already exists, FileExistsError is raised.
If parents is true, any missing parents of this path are created as needed; they are created with the default permissions without taking mode into account (mimicking the POSIX mkdir -p command).
If parents is false (the default), a missing parent raises FileNotFoundError.
If exist_ok is false (the default), FileExistsError is raised if the target directory already exists.
If exist_ok is true, FileExistsError exceptions will be ignored (same behavior as the POSIX mkdir -p command), but only if the last path component is not an existing non-directory file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
Union[pathlib.Path, str] |
uri to create dir. |
required |
mode |
int |
mask mode. Defaults to 0o777. |
511 |
parents |
bool |
If true, creates any parents if required. Defaults to False. |
False |
exist_ok |
bool |
If true, will not raise exception if dir already exists. Defaults to False. |
False |
open(self, uri, mode='r', schema=None, data=None, fileobj=None, buffering=-1, encoding=None, newline=None, content_type='', inmem_size=None, delimiter=None, chunk_size=8192, etag='', schema_kwargs=None, **extra_kwargs)
¶
Open an appropriate registered stream based on the uri schema.
For example, if HttpStream is registered for http/https schema, any uri starting with http or https will be opended using HttpStream.
You can perform most IO methods on the stream (e.g. read, write, seek, ...) depending on the provided mode (e.g. r, rb, w, wb, ...).
If you open the stream with a context, it will close automatically after exiting the context.
The only difference with a native "open" is that changes to the stream buffer will not be pushed to the actual remote resource until "save" or "close" is called.
Appropriate concrete AbcStreams must be registered with Streams to support the corresponding uris.
streams = Streams().register(HttpStream, {"http", "https"})
# Get some ndjson data from a https endpoint
with streams.open("https://foo.bar/data.ndjson", "r") as stream:
ndjsons = [json.loads(line) for line in stream]
# Post some binary content to a http endpoint
with streams.open("https://foo.bar/api/data", "wb") as stream:
stream.write(b"hello world")
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
Union[str, pathlib.Path] |
uri to the resource. |
required |
mode |
str |
same as "open". Defaults to "r". |
'r' |
schema |
str |
if provided, supersed the schema inferred from the uri. Defaults to None. |
None |
data |
Union[bytes, str] |
if provided, create a stream with the initial data. Defaults to None. |
None |
fileobj |
Any |
if provided, create a stream with the data copied from the fileobj. Defaults to None. |
None |
buffering |
int |
same as "open". Defaults to -1. |
-1 |
encoding |
str |
encoding used to decode binary data to string. Defaults to None. |
None |
newline |
str |
same as "open". Defaults to None. |
None |
content_type |
str |
resource mime type (e.g. application/json). Defaults to "". |
'' |
inmem_size |
int |
max data size before buffer is rollover into disk. Defaults to None (i.e. never - may result in MemoryError). |
None |
delimiter |
Union[str, bytes] |
delimiter used for determining line boundaries. Defaults to None. |
None |
chunk_size |
int |
size of chunks to return in binary mode. Defaults to io.DEFAULT_BUFFER_SIZE. |
8192 |
etag |
str |
etag for the stream content. Defaults to "". |
'' |
schema_kwargs |
dict |
dict of schema to default parameters for the corresponding AbcStreams. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
AbcStream |
AbcStream: concrete AbcStream based on the uri schema. |
register(self, stream_type, schemas=None, **kwargs)
¶
Register an concrete AbcStream (e.g. HttpStream, FileStream, TempStream) and the corresponding schemas to the Streams object.
Steams will use the appropriate registered concrete AbcStream based on the uri schema.
By default, FileStream will already be registered as it is the fallback for any unknown uri.
Exceptions:
Type | Description |
---|---|
ValueError |
schema is not provided and {stream_type} has an empty supported_schemas. |
Returns:
Type | Description |
---|---|
Streams |
Streams: current Streams object. |
rmdir(self, uri, ignore_errors=False, **kwargs)
¶
Remove the entire directory.
set_buffer_rollover_size(value)
classmethod
¶
Set the max size of the buffer before the data is rollover to disk.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
int |
size before rollover |
required |
set_schema_kwargs(self, schema, **kwargs)
¶
Set and update the default parameters for the different AbcStream implementations based on their schema.
# do not verify ssl cert for all https requests
# by passing verify=False flag to requests inside HttpStream
# i.e. HttpStream is implemented with requests
Streams().set_schema_kwargs("https", verify=False).open("https://foo.bar")
Returns:
Type | Description |
---|---|
Streams |
Streams: current Streams object. |
stats(self, uri)
¶
Get the StreamInfo.
unlink(self, uri, missing_ok=True, **kwargs)
¶
Delete and remove a stream resource.