Skip to content

iotoolz

PyPI version Build Status Coverage Status Documentation Status Code style: black Downloads

iotoolz is an improvement over e2fyi-utils and is inspired partly by toolz. iotoolz is a lib to help provide a consistent dev-x for interacting with any IO resources. It provides an abstract class iotoolz.AbcStream which mimics python's native open very closely (with some additional parameters and methods such as save).

API documentation can be found at https://iotoolz.readthedocs.io/en/latest/.

Change logs are available in CHANGELOG.md.

Quickstart

# install the default packages only (most lite-weight)
pip install iotoolz

iotoolz.streams

The helper object iotoolz.streams.stream_factory is a default singleton of iotoolz.streams.Streams provided to support most of the common use cases.

iotoolz.streams.open_stream is a util method provided by the singleton helper to create a stream object. This method accepts the same arguments as python's open method with the following additional parameters:

  • data: optional str or bytes that will be passed into the stream
  • fileobj: optional file-like object which will be copied into the stream
  • content_type: optional mime type information to describe the stream (e.g. application/json)
  • inmem_size: determines how much memory to allocate to the stream before rolling over to local file system. Defaults to no limits (may result in MemoryError).
  • schema_kwargs: optional mapping of schemas to their default kwargs.
from iotoolz.streams import open_stream

default_schema_kwargs = {
    "https": {"verify": False}  # pass to requests - i.e. don't verify ssl
}

# this will return a stream that reads from the site
http_google = open_stream(
    "https://google.com",
    mode="r",
    schema_kwargs=default_schema_kwargs
)

html = http_google.read()
content_type = http_google.content_type
encoding = http_google.encoding

# this will write to the https endpoint using the POST method (default is PUT)
with open_stream("https://foo/bar", mode="wb", use_post=True) as stream:
    stream.write(b"hello world")


# this will write to a local path
# save will write the current content to the local file
foo_txt = open_stream(
    "path/to/foo.txt",
    mode="w",
    content_type="text/plain",
    encoding="utf-8",
    data="foo bar",
).save()

# go to the end of the buffer
foo_txt.seek(0, whence=2)
# append more data
foo_txt.write("\nnext line")
# save and close the data
foo_txt.close()

Piping streams

pipe is method to push data to a sink (similar to NodeJS stream except it has no watermark or buffering).

from  iotoolz.streams import open_stream

local_file = open_stream(
    "path/to/google.html", content_type="text/html", mode="w"
)
temp_file = open_stream(
    "tmp://google.html", content_type="text/html", mode="wb"
)

# when source is closed, all sinks will be closed also
with open_stream("https://google.com") as source:
    # writes to a temp file then to a local file in sequence
    source.pipe(temp_file).pipe(local_file)


local_file2 = open_stream(
    "path/to/google1.html", content_type="text/html", mode="w"
)
local_file3 = open_stream(
    "path/to/google2.html", content_type="text/html", mode="w"
)

# when source is closed, all sinks will be closed also
with open_stream("tmp://foo_src", mode="w") as source:
    # writes in a fan shape manner
    source.pipe(local_file2)
    source.pipe(local_file3)

    source.write("hello world")

TODO support transform streams so that pipe can be more useful

Creating a custom AbcStream class

The abstract class iotoolz.AbcStream requires the following methods to be implemented:

# This is the material method to get the data from the actual IO resource and return
# a Tuple with an Iterable to the data and the corresponding StreamInfo.
def read_to_iterable_(
    self, uri: str, chunk_size: int, **kwargs
) -> Tuple[Iterable[bytes], StreamInfo]:
    ...

# This is the material method to write the data to the actual IO resource.
# This method is only triggered when "close" or "save" is called.
# You should use the "file_" parameter (a file-like obj) to write the current data to
# the actual IO resource.
def write_from_fileobj_(
    self, uri: str, file_: IO[bytes], size: int, **kwargs
) -> StreamInfo:
    ...

StreamInfo is a dataclass to hold the various info about the data stream (e.g. content_type, encoding and etag).

Ideally, the implementation of any AbcStream class should also provide supported_schemas (Set[str]) as a class variable. This class variable will be used in the future to infer what sort of schemas that will be supported by the class. For example, since https and http are supported by iotoolz.HttpStream, all uri that starts with https:// and http:// can be handled by iotoolz.HttpStream.

Example implementation of a HttpStream using requests

class HttpStream(AbcStream):
    supported_schemes = {"http", "https"}

    def read_to_iterable_(
        self, uri: str, chunk_size: int, **kwargs
    ) -> Tuple[Iterable[bytes], StreamInfo]:
        resp = requests.get(uri, stream=True, **cytoolz.dissoc(kwargs, "stream"))
        resp.raise_for_status()
        info = StreamInfo(
            content_type=resp.headers.get("Content-Type"),
            encoding=resp.encoding,
            etag=resp.headers.get("etag"),
        )
        return resp.iter_content(chunk_size=chunk_size), info

    def write_from_fileobj_(
        self, uri: str, file_: IO[bytes], size: int, **kwargs
    ) -> StreamInfo:
        use_post = kwargs.get("use_post")
        requests_method = requests.post if use_post else requests.put
        resp = requests_method(
            uri,
            data=requests_toolbelt.StreamingIterator(size, file_),
            **cytoolz.dissoc(kwargs, "use_post", "data")
        )
        resp.raise_for_status()
        return StreamInfo()

Last update: October 19, 2020