Skip to content

iotoolz.streams.Streams

iotoolz.streams.Streams

Helper class to open streams with the corresponding AbcStream implementation based on the uri schema.

INMEM_SIZE: int

Methods

__init__(self, *stream_types) special

Creates a new instance of Streams.

exists(self, uri)

Whether a stream points to an existing resource.

glob(self, uri)

Yield streams that matches the provided scheme and pattern.

Parameters:

Name Type Description Default
uri Union[pathlib.Path, str]

base uri to glob.

required
pattern str

unix shell style pattern. Defaults to "*".

required

Returns:

Type Description
Iterator[iotoolz._abc.AbcStream]

Iterator["AbcStream"]: iterator of streams that matches the pattern.

Yields

AbcStream: stream object

iter_dir(self, uri)

If a directory, yields stream in the directory. Otherwise, yield all streams in the same directory as the provided uri.

Parameters:

Name Type Description Default
uri Union[pathlib.Path, str]

uri to list.

required

Returns:

Type Description
Iterator[iotoolz._abc.AbcStream]

Iterator["AbcStream"]: iterator of streams that matches the pattern.

Yields

AbcStream: stream object

mkdir(self, uri, mode=511, parents=False, exist_ok=False)

Create a new directory at this given path. If mode is given, it is combined with the process’ umask value to determine the file mode and access flags. If the path already exists, FileExistsError is raised.

If parents is true, any missing parents of this path are created as needed; they are created with the default permissions without taking mode into account (mimicking the POSIX mkdir -p command).

If parents is false (the default), a missing parent raises FileNotFoundError.

If exist_ok is false (the default), FileExistsError is raised if the target directory already exists.

If exist_ok is true, FileExistsError exceptions will be ignored (same behavior as the POSIX mkdir -p command), but only if the last path component is not an existing non-directory file.

Parameters:

Name Type Description Default
uri Union[pathlib.Path, str]

uri to create dir.

required
mode int

mask mode. Defaults to 0o777.

511
parents bool

If true, creates any parents if required. Defaults to False.

False
exist_ok bool

If true, will not raise exception if dir already exists. Defaults to False.

False

open(self, uri, mode='r', schema=None, data=None, fileobj=None, buffering=-1, encoding=None, newline=None, content_type='', inmem_size=None, delimiter=None, chunk_size=8192, etag='', schema_kwargs=None, **extra_kwargs)

Open an appropriate registered stream based on the uri schema.

For example, if HttpStream is registered for http/https schema, any uri starting with http or https will be opended using HttpStream.

You can perform most IO methods on the stream (e.g. read, write, seek, ...) depending on the provided mode (e.g. r, rb, w, wb, ...).

If you open the stream with a context, it will close automatically after exiting the context.

The only difference with a native "open" is that changes to the stream buffer will not be pushed to the actual remote resource until "save" or "close" is called.

Appropriate concrete AbcStreams must be registered with Streams to support the corresponding uris.

streams = Streams().register(HttpStream, {"http", "https"})

# Get some ndjson data from a https endpoint
with streams.open("https://foo.bar/data.ndjson", "r") as stream:
    ndjsons = [json.loads(line) for line in stream]

# Post some binary content to a http endpoint
with streams.open("https://foo.bar/api/data", "wb") as stream:
    stream.write(b"hello world")

Parameters:

Name Type Description Default
uri Union[str, pathlib.Path]

uri to the resource.

required
mode str

same as "open". Defaults to "r".

'r'
schema str

if provided, supersed the schema inferred from the uri. Defaults to None.

None
data Union[bytes, str]

if provided, create a stream with the initial data. Defaults to None.

None
fileobj Any

if provided, create a stream with the data copied from the fileobj. Defaults to None.

None
buffering int

same as "open". Defaults to -1.

-1
encoding str

encoding used to decode binary data to string. Defaults to None.

None
newline str

same as "open". Defaults to None.

None
content_type str

resource mime type (e.g. application/json). Defaults to "".

''
inmem_size int

max data size before buffer is rollover into disk. Defaults to None (i.e. never - may result in MemoryError).

None
delimiter Union[str, bytes]

delimiter used for determining line boundaries. Defaults to None.

None
chunk_size int

size of chunks to return in binary mode. Defaults to io.DEFAULT_BUFFER_SIZE.

8192
etag str

etag for the stream content. Defaults to "".

''
schema_kwargs dict

dict of schema to default parameters for the corresponding AbcStreams. Defaults to None.

None

Returns:

Type Description
AbcStream

AbcStream: concrete AbcStream based on the uri schema.

register(self, stream_type, schemas=None, **kwargs)

Register an concrete AbcStream (e.g. HttpStream, FileStream, TempStream) and the corresponding schemas to the Streams object.

Steams will use the appropriate registered concrete AbcStream based on the uri schema.

By default, FileStream will already be registered as it is the fallback for any unknown uri.

Exceptions:

Type Description
ValueError

schema is not provided and {stream_type} has an empty supported_schemas.

Returns:

Type Description
Streams

Streams: current Streams object.

rmdir(self, uri, ignore_errors=False, **kwargs)

Remove the entire directory.

set_buffer_rollover_size(value) classmethod

Set the max size of the buffer before the data is rollover to disk.

Parameters:

Name Type Description Default
value int

size before rollover

required

set_schema_kwargs(self, schema, **kwargs)

Set and update the default parameters for the different AbcStream implementations based on their schema.

# do not verify ssl cert for all https requests
# by passing verify=False flag to requests inside HttpStream
# i.e. HttpStream is implemented with requests
Streams().set_schema_kwargs("https", verify=False).open("https://foo.bar")

Returns:

Type Description
Streams

Streams: current Streams object.

stats(self, uri)

Get the StreamInfo.

Delete and remove a stream resource.


Last update: October 19, 2020