upathlib#

(Generated on Nov 19, 2024 for upathlib version 0.9.8.)

The package upathlib defines a unified API for cloud blob store (aka “object store”) as well as local file systems.

Attention is focused on identifying the most essential functionalities while working with a blob store for data processing. Functionalities in a traditional local file system that are secondary in these tasks—such as symbolic links, fine-grained permissions, and various access modes—are ignored.

End user should look to the class Upath for documentation of the API. Local file system is implemented by LocalUpath, which subclasses Upath. Client for Google Cloud Storage (i.e. blob store on GCP) is implemented by another Upath subclass, namely GcsBlobUpath.

One use case is the package biglist, where the class Biglist takes a Upath object to indicate its location of storage. It does not care whether the storage is local or in a cloud blob store—it simply uses the common API to operate the storage.

To install, do one of the following:

$ pip3 install upathlib
$ pip3 install upathlib[gcs]

Quickstart#

Let’s carve out a space in the local file system and poke around.

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/abc')

This creates a LocalUpath object p that points to the location '/tmp/abc'. This may be an existing file, or directory, or may be nonexistent. We know this is a temporary location; to be sure we have a clear playground, let’s wipe out anything and everything:

>>> p.rmrf()
0

Think rm -rf /tmp/abc. It does just that. The returned 0 means zero files were deleted.

Now let’s create a file and write something to it:

>>> (p / 'x.txt').write_text('first')

This creates file /tmp/abc/x.txt with the content 'first'. Note the directory '/tmp/abc' did not exist before the call. We did not need to “create the parent directory”. In fact, upathlib does not provide a way to do that. In upathlib, “directory” is a “virtual” thing that is embodied by a group of files. For example, if there exist

/tmp/abc/x.txt
/tmp/abc/d/y.data

we say there is directories '/tmp/abc' and '/tmp/abc/d', but we don’t create these “directories” by themselves. These directories come into being if there exist such files.

Let’s actually create these files:

>>> (p / 'x.txt').write_text('second', overwrite=True)
>>> (p / 'd' / 'y.data').write_bytes(b'0101')

Now let’s look into this directory:

>>> p.is_dir()
True
>>> (p / 'd').is_dir()
True
>>> (p / 'x.txt').is_dir()
False
>>> (p / 'x.txt').is_file()
True

We can navigate in the directory. For example,

>>> for v in sorted(p.iterdir()):  # the sort merely makes the result stable
...     print(v)
/tmp/abc/d
/tmp/abc/x.txt

This is only the first level, or “direct children”. We can also use “recursive iterdir” to get all files under the directory, descending into subdirectories recursively:

>>> for v in sorted(p.riterdir()):  # the sort merely makes the result stable
...     print(v)
/tmp/abc/d/y.data
/tmp/abc/x.txt

This time only files are listed. Subdirectories do not show up because, after all, they are not real in upathlib concept.

We can as easily read a file, like

>>> (p / 'x.txt').read_text()
'second'

Several common file formats are provided out of the box, including text, bytes, json, and pickle, as well as compressed versions by zlib and Zstandard.

Let’s do some JSON:

>>> pp = p / 'e/f/g/data.json'
>>> pp.write_json({'name': 'John', 'age': 38})

We know the JSON file is also a text file, so we can treat it as such:

>>> pp.read_text()
'{"name": "John", "age": 38}'

But usually we prefer to get back the Python object directly:

>>> v = pp.read_json()
>>> v
{'name': 'John', 'age': 38}
>>> type(v)
<class 'dict'>

We can go “down” the directory tree using /. Conversely, we can go “up” using parent():

>>> pp.path
PosixPath('/tmp/abc/e/f/g/data.json')
>>> pp.parent
LocalUpath('/tmp/abc/e/f/g')
>>> pp.parent.parent
LocalUpath('/tmp/abc/e/f')
>>> pp.parent.parent.is_dir()
True
>>> pp.parent.parent.is_file()
False

or the terminal-lovers’ ..:

>>> pp
LocalUpath('/tmp/abc/e/f/g/data.json')
>>> pp / '..'
LocalUpath('/tmp/abc/e/f/g')
>>> pp / '..' / '..'
LocalUpath('/tmp/abc/e/f')

Under the hood, / delegates to a call to joinpath():

>>> pp.joinpath('../../o/p/q')
LocalUpath('/tmp/abc/e/f/o/p/q')

Let’s see again what we have:

>>> sorted(p.riterdir())
[LocalUpath('/tmp/abc/d/y.data'), LocalUpath('/tmp/abc/e/f/g/data.json'), LocalUpath('/tmp/abc/x.txt')]

and to get rid of them all:

>>> p.rmrf()
3

A nice thing about upathlib is the “unified” nature across local and cloud storages. Suppose we have set up the environment to use Google Cloud Storage, then we could have started this excercise with

>>> from upathlib import GcsBlobUpath
>>> p = GcsBlobUpath('gs://my-bucket/tmp/abc')

Everything after this would work unchanged. (The printouts would be different at some places, e.g. LocalUpath would be replaced by GcsBlobUpath.)

Upath#

Upath is an abstract base class that defines the APIs and some of the implementation. Subclasses tailor to particular storage systems. Currently there are two production-ready subclasses; they implement Upath for local file systems and Google Cloud Storage, respectively.

The APIs follow the style of the standard library pathlib where appropriate.

class upathlib.Upath[source]#

Bases: ABC

__init__(*pathsegments: str)[source]#

Create a Upath instance. Because Upath is an abstract class, this is always called on a subclass to instantiate a path on the specific storage system.

Subclasses for cloud blob stores may need to add additional parameters representing, e.g., container/bucket name, etc.

Parameters:
*pathsegments

Analogous to the input to pathlib.Path. The first segment may or may not start with '/'. The path constructed with *pathsegments is always “absolute” under a known “root”.

For a local POSIX file system, the root is the usual '/'.

For a local Windows file system, the root is resolved to a particular “drive”.

For Azure blob store, the root is that in a “container”.

For AWS and GCP blob stores, the root is that in a “bucket”.

If missing, the path constructed is the “root”. However, the subclass LocalUpath plugs in the current working directory for a missing *pathsegments.

Note

If one segment starts with '/', it will reset to the “root” and discard all the segments that have come before it. For example, Upath('work', 'projects', '/', 'projects') is the same as Upath('/', 'projects).

Note

The first element of *pathsegments may start with some platform-specific strings. For example, '/' on Linux, 'c://' on Windows, 'gs://' on Google Cloud Storage. Please see subclasses for specifics.

__truediv__(key: str) Self[source]#

This method is invoked by self / key. This calls the method joinpath().

property path: PurePath#

The pathlib.PurePath version of the internal path string.

In the subclass LocalUpath, this property is overridden to return a pathlib.Path, which is a subclass of pathlib.PurePath.

In subclasses for cloud blob stores, this implementation stays in effect.

abstract as_uri() str[source]#

Represent the path as a file URI. See subclasses for platform-dependent specifics.

property name: str#

A string representing the final path component, excluding the drive and root, if any.

This is the name component of self.path. If self.path is '/' (the root), then an empty string is returned. (The name of the root path is empty.)

Examples

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.path
PosixPath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.name
'sales.txt.gz'
>>> p.parent.parent.parent.parent
LocalUpath('/tmp')
>>> p.parent.parent.parent.parent.name
'tmp'
>>> p.parent.parent.parent.parent.parent
LocalUpath('/')
>>> p.parent.parent.parent.parent.parent.name
''
>>> # the parent of root is still root:
>>> p.parent.parent.parent.parent.parent.parent
LocalUpath('/')
property stem: str#

The final path component, without its suffix.

This is the “stem” part of self.name.

Examples

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p
LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p.path
PosixPath('/tmp/test/upathlib/data/sales.txt')
>>> p.name
'sales.txt'
>>> p.stem
'sales'
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.stem
'sales.txt'
property suffix: str#

The file extension of the final component, if any

property suffixes: list[str]#

A list of the path’s file extensions.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p.suffix
'.txt'
>>> p.suffixes
['.txt']
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.suffix
'.gz'
>>> p.suffixes
['.txt', '.gz']
exists() bool[source]#

Return True if the path is an existing file or dir; False otherwise.

Examples

In a blob store with blobs

/a/b/cd
/a/b/cd/e.txt

'/a/b/cd' exists, and is both a file and a dir; '/a/b/cd/e.txt' exists, and is a file; '/a/b' exists, and is a dir; '/a/b/c' does not exist.

abstract is_dir() bool[source]#

Return True if the path is an existing directory; False otherwise.

If there exists a file named like

/a/b/c/d.txt

we say '/a', '/a/b', '/a/b/c' are existing directories.

In a cloud blob store, there’s no such thing as an “empty directory”, because there is no concept of “directory”. A blob store just consists of files (aka blobs) with names, which could contain the letter ‘/’, with no special meaning attached to it. We interpret the name '/a/b' as a directory to emulate the familiar concept in a local file system when there exist files named '/a/b/*'.

In a local file system, there can be empty directories. However, it is recommended to not have empty directories.

There is no method for “creating an empty dir” (like the Linux command mkdir). Simply create a file under the dir, and the dir will come into being. This is analogous to we create files all the time—we don’t “create” an empty file in advance; we simply write to the would-be path of the file to be created.

abstract is_file() bool[source]#

Return True if the path is an existing file; False otherwise.

In a cloud blob store, a path can be both a file and a dir. For example, if these blobs exist:

/a/b/c/d.txt
/a/b/c

we say /a/b/c is a “file”, and also a “dir”. User is recommended to avoid such namings.

This situation does not happen in a local file system.

abstract file_info() FileInfo | None[source]#

If is_file() is False, return None; otherwise, return file info.

property parent: Self#

The parent of the path.

If the path is the root, then the parent is still the root.

abstract property root: Self#

Return a new path representing the root.

joinpath(*other: str) Self[source]#

Join this path with more segments, return the new path object.

Calling this method is equivalent to combining the path with each of the other arguments in turn.

If self was created by Upath(*segs), then this method essentially returns Upath(*segs, *other).

If *other is a single string, there is a shortcut by the operator /, implemented by __truediv__().

with_name(name: str) Self[source]#

Return a new path the the “name” part substituted by the new value. If the original path doesn’t have a name (i.e. the original path is the root), ValueError is raised.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.with_name('sales.data')
LocalUpath('/tmp/test/upathlib/data/sales.data')
with_stem(stem: str) Self[source]#
with_suffix(suffix: str) Self[source]#

Return a new path with the suffix replaced by the specified value. If the original path doesn’t have a suffix, the new suffix is appended instead. If suffix is an empty string, the original suffix is removed.

suffix should include a dot, like '.txt'.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>>
>>> # replace the last suffix:
>>> p.with_suffix('.data')
LocalUpath('/tmp/test/upathlib/data/sales.txt.data')
>>>
>>> # remove the last suffix:
>>> p.with_suffix('')
LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>>
>>> p.with_suffix('').with_suffix('.bin')
LocalUpath('/tmp/test/upathlib/data/sales.bin')
>>>
>>> pp = p.with_suffix('').with_suffix('')
>>> pp
LocalUpath('/tmp/test/upathlib/data/sales')
>>>
>>> # no suffix to remove:
>>> pp.with_suffix('')
LocalUpath('/tmp/test/upathlib/data/sales')
>>>
>>> # add a suffix:
>>> pp.with_suffix('.pickle')
LocalUpath('/tmp/test/upathlib/data/sales.pickle')
abstract write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None[source]#

Write bytes data to the current file.

Parent “directories” are created as needed, if applicable.

If overwrite is False and the current file exists, FileExistsError is raised.

data is either “byte-like” (such as bytes, bytearray, memoryview) or “file-like” open in “binary” mode. In the second case, the file should be positioned at the beginning (such as by calling .seek(0).)

abstract read_bytes() bytes[source]#

Return the binary contents of the pointed-to file as a bytes object.

If self is not a file or does not exist, FileNotFoundError is raised.

write_text(data: str, *, overwrite: bool = False, encoding: str | None = None, errors: str | None = None) None[source]#

Write text data to the current file.

Parent “directories” are created as needed, if applicable.

If overwrite is False and the current file exists, FileExistsError is raised.

encoding and errors are passed to encode(). Usually you should leave them at the default values.

read_text(*, encoding: str | None = None, errors: str | None = None) str[source]#

Return the decoded contents of the pointed-to file as a string.

If self is not a file or does not exist, FileNotFoundError is raised.

encoding and errors are passed to decode(). Usually you should leave them at the default values.

write_json(data: Any, *, overwrite=False, **kwargs) None[source]#
read_json(**kwargs) Any[source]#
write_pickle(data: Any, *, overwrite=False, **kwargs) None[source]#
read_pickle(**kwargs) Any[source]#
write_pickle_zstd(data: Any, *, overwrite=False, **kwargs) None[source]#
read_pickle_zstd(**kwargs) Any[source]#
copy_dir(source: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int[source]#

Copy the content of the source directory recursively to self.

If source is an string, then it is in the same store as the current path, and it is either absolute, or relative to self.parent.

If source is not a string, then it must be an instance of a Upath() subclass, and it may be in any store system.

Immediate children of source will be copied as immediate children of self.

There is no such error as “target directory exists” as the copy-operation only concerns individual files. If the target directory (i.e. self) contains files that do not have counterparts in the source directory, they will stay untouched.

overwrite is file-wise. If False, any existing target file will raise FileExistsError and halt the operation. If True, any existing target file will be overwritten by the source file.

quiet controls whether to print out progress info.

Returns:
int

The number of files copied.

copy_file(source: str | Upath, *, overwrite: bool = False) None[source]#

Copy the source file to the current file (i.e. self).

If source is str, then it is in the same store as the current path, and it is either absolute, or relative to self.parent. For example, if self is '/a/b/c/d.txt', then source='e.txt' means '/a/b/c/e.txt'.

If source is not a string, then it must be an instance of a Upath() subclass, and it may be in any storage system.

If source is not an existing file, FileNotFoundError is raised.

If self is an existing file and overwrite is False, FileExistsError is raised. If overwrite is True, then the self will be overwritten.

If type(source) is LocalUpath and self is an existing directory, then IsADirectoryError is raised. In a cloud blob store, there is no concrete “directory”. For example, suppose self is ‘gs://mybucket/backup/record’ on Google Cloud Storage, and source is ‘/experiment/data’, then source is ‘gs://mybucket/experiment/data’. If there exists blob ‘gs://mybucket/backup/record/y’, then we say ‘gs://mybucket/backup/record’ is a “directory”. However, this is merely a “virtual” concept, or an emulation of the “directory” concept on local disk. As long as the self path is not an existing blob, the copy will proceed with no problem. Nevertheless, such naming is confusing and better avoided.

Return the number of files copied (either 0 or 1).

remove_dir(*, quiet: bool = True, concurrent: bool = True) int[source]#

Remove the current directory (i.e. self) and all its contents recursively.

Essentially, this removes each file that is yielded by riterdir(). Subclasses should take care to remove “empty directories”, if applicable, that are left behind.

quiet controls whether to print progress info.

Returns:
int

The number of files removed.

abstract remove_file() None[source]#

Remove the current file (i.e. self).

If self is not an existing file, FileNotFoundError is raised. If the file exists but can’t be removed, the platform-dependent exception is propagated.

abstract iterdir() Iterator[Self][source]#

Yield the immediate (i.e. non-recursive) children of the current dir (i.e. self).

The yielded elements are instances of the same class. Each yielded element is either a file or a dir. There is no guarantee on the order of the returned elements.

If self is not a dir (e.g. maybe it’s a file), or does not exist at all, nothing is yielded (resulting in an empty iterable); no exception is raised.

See also

riterdir().

ls() list[Self][source]#

Return the elements yielded by iterdir() in a sorted list.

Sorting is by a full path string maintained internally.

The returned list may be empty.

abstract riterdir() Iterator[Self][source]#

Yield files under the current dir (i.e. self) recursively. The method name means “recursive iterdir”.

The yielded elements are instances of the same class. They represent existing files.

Compared to iterdir(), this is recursive, and yields files only. Empty subdirectories will have no representation in the return.

Similar to iterdir(), if self is not a dir or does not exist, then nothing is yielded, and no exception is raised either.

There is no guarantee on the order of the returned elements.

rmrf(*, quiet: bool = True, concurrent: bool = False) int[source]#

Remove the current file or dir (i.e. self) recursively.

Analogous to the Linux command rm -rf, hence the name of this method.

Return the number of files removed.

For example, if these blobs are present:

/a/b/c/d/e.txt
/a/b/c/kk.data
/a/b/c

then Upath('/a/b/c').rmrf() would remove all of them.

concurrent is False by default because this method is often used in __del__ of user classes, and thread pool is problematic in __del__.

abstract lock(*, timeout: int = None) Self[source]#

Lock the current file (i.e. self), in order to have exclusive access to the code block that has possession of the lock.

timeout: if the lock can’t be acquired within timeout seconds, LockAcquireError is raised. If None, wait for a default reasonably long time. To wait “forever”, just pass in a large number.

timeout=0 is a valid input, meaning making exactly one attempt to acquire a lock.

Once a lock is acquired, it will not expire until this contextmanager exits. In other words, this is timeout for the “lock acquisition”, not for the lock itself. Actual waiting time could be slightly longer or shorter.

This is a “mandatory lock”, as opposed to an “advisory lock”. The intended use case is for this lock to be used for implementing a (cooperative) “code lock”.

As this abstract method is to be used as a context manager, a subclass should use yield in its implementation. The yield statement should yield self.

One way to achieve cooperative locking on a file via this lock is like this:

f = Upath('abc.txt')
with f.lock():
    ...
    # Now read from or write to `f` with exclusive access.

Depending on the capabilities of the specific storage system, the lock may be on the current file itself or on another, helper file as an implementation detail. In the latter case, this method should delete the helper file upon release of the lock, if possible.

Some storage engines may not provide the capability to implement this lock.

class upathlib.FileInfo[source]#

Bases: object

ctime: float#

Creation time as a POSIX timestamp.

mtime: float#

Last modification time as a POSIX timestamp.

time_created: datetime#

Creation time as an datetime object.

time_modified: datetime#

Last modification time as an datetime object.

size: int#

In bytes.

details: Any#

Platform-dependent.

__init__(ctime: float, mtime: float, time_created: datetime, time_modified: datetime, size: int, details: Any) None#

LocalUpath#

class upathlib.LocalUpath[source]#

Bases: Upath, PathLike

__init__(*pathsegments: str)[source]#

Create a path on the local file system. Both POSIX and Windows platforms are supported.

*pathsegments specify the path, either absolute or relative to the current working directory. If missing, the constructed path is the current working directory. This is passed to pathlib.Path.

__fspath__() str[source]#

LocalUpath implements the os.PathLike protocol, hence a LocalUpath object can be used anywhere an object implementing os.PathLike is accepted. For example, used with the builtin function open():

>>> p = LocalUpath('/tmp/test/data.txt')
>>> p.rmrf()
0
>>> p.write_text('abc')
>>> with open(p) as file:
...     print(file.read())
abc
>>> p.rmrf()
1
property path: Path#

Return the pathlib.Path object of the path.

as_uri() str[source]#

Represent the path as a file URI. On Linux, this is like ‘file:///home/username/path/to/file’. On Windows, this is like ‘file:///C:/Users/username/path/to/file’.

is_dir() bool[source]#

Return whether the current path is a dir.

is_file() bool[source]#

Return whether the current path is a file.

file_info() FileInfo | None[source]#

Return file info if the current path is a file; otherwise return None.

property root: LocalUpath#

Return a new path representing the root.

On Windows, this is the root on the same drive, like LocalUpath('C:'). On Linux and Mac, this is LocalUpath('/').

read_bytes() bytes[source]#

Read the content of the current file as bytes.

write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False)[source]#

Write the bytes data to the current file.

copy_file(source: str | Upath, *, overwrite: bool = False) None[source]#
copy_dir(source: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int[source]#
remove_dir(**kwargs) int[source]#

Remove the current dir along with all its contents recursively.

remove_file() None[source]#

Remove the current file.

rename_dir(target: str | LocalUpath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) LocalUpath[source]#

Rename the current dir (i.e. self) to target.

overwrite is applied file-wise. If there are files under target that do not have counterparts under self, they are left untouched.

quiet controls whether to print progress info.

Return the new path.

rename_file(target: str | LocalUpath, *, overwrite: bool = False) LocalUpath[source]#

Rename the current file (i.e. self) to target in the same store.

target is either absolute or relative to self.parent. For example, if self is ‘/a/b/c/d.txt’, then target='e.txt' means ‘/a/b/c/e.txt’.

If overwrite is False (the default) and the target file exists, FileExistsError is raised.

Return the new path.

iterdir() Iterator[LocalUpath][source]#

Yield the immediate children under the current dir.

riterdir() Iterator[LocalUpath][source]#

Yield all files under the current dir recursively.

lock(*, timeout=None)[source]#

This uses the package filelock to implement a file lock for inter-process communication.

Note

At the end, this file is not deleted. If it is purely a dummy file to implement locking for other things, user may want to delete this file after use.

BlobUpath#

class upathlib._blob.BlobUpath[source]#

Bases: Upath

BlobUpath is a base class for paths in a cloud storage, aka “blob store”. This is in contrast to a local disk storage, which is implemented by LocalUpath.

property blob_name: str#

Return the “name” of the blob. This is the “path” without a leading '/'. In cloud blob stores, this is exactly the name of the blob. The name often contains '/', which has no special role in the name per se but is interpreted by users to be a directory separator.

is_dir() bool[source]#

In a typical blob store, there is no such concept as a “directory”. Here we emulate the concept in a local file system. If there is a blob named like

/ab/cd/ef/g.txt

we say there exists directory “/ab/cd/ef”. We should never have a trailing / in a blob’s name, like

/ab/cd/ef/

(I don’t know whether the blob stores allow such blob names.)

Consequently, is_dir is equivalent to “having stuff in the dir”. There is no such thing as an “empty directory” in blob stores.

iterdir() Iterator[Self][source]#

Yield immediate children under the current dir.

This is a naive, inefficient implementation. Expected to be refined by subclasses.

download_dir(target: str | Path | LocalUpath, **kwargs) int[source]#
download_file(target: str | Path | LocalUpath, **kwargs) None[source]#
upload_dir(source: str | Path | LocalUpath, **kwargs) int[source]#
upload_file(source: str | Path | LocalUpath, **kwargs) None[source]#

GcsBlobUpath#

class upathlib.GcsBlobUpath[source]#

Bases: BlobUpath

GcsBlobUpath implements the Upath API for Google Cloud Storage using the package google-cloud-storage.

__init__(*paths: str, bucket_name: str | None = None)[source]#

If bucket_name is None, then *paths should be a single string starting with ‘gs://<bucket-name>/’.

If bucket_name is specified, then *paths specify path components under the root of the bucket.

Examples

These several calls are equivalent:

>>> GcsBlobUpath('experiments', 'data', 'first.data', bucket_name='backup')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('/experiments/data/first.data', bucket_name='backup')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('gs://backup/experiments/data/first.data')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('gs://backup', 'experiments', 'data/first.data')
GcsBlobUpath('gs://backup/experiments/data/first.data')
as_uri() str[source]#

Represent the path as a file URI, like ‘gs://bucket-name/path/to/blob’.

is_file() bool[source]#

The result of this call is not cached, in case the object is modified anytime by other clients.

is_dir() bool[source]#

If there is a dummy blob with name f"{self.name}/", this will return True. This is the case after creating a “folder” on the GCP dashboard. In programmatic use, it’s recommended to avoid such situations so that is_dir() returns True if and only if there are blobs “under” the current path.

file_info(*, timeout=None) FileInfo | None[source]#

Return file info if the current path is a file; otherwise return None.

property root: GcsBlobUpath#

Return a new path representing the root of the same bucket.

write_bytes(data: bytes | BufferedReader, *, overwrite=False, **kwargs)[source]#

Write bytes data to the current blob.

In the usual case, data is bytes. The case where data is a io.BufferedReader object, such as an open file, is not well tested.

read_bytes(**kwargs) bytes[source]#

Return the content of the current blob as bytes.

iterdir() Iterator[Self][source]#

Yield immediate children under the current dir.

remove_dir(**kwargs) int[source]#

Remove the current dir and all the content under it recursively. Return the number of blobs removed.

remove_file() None[source]#

Remove the current blob.

riterdir() Iterator[Self][source]#

Yield all blobs recursively under the current dir.

lock(*, timeout=None)[source]#

This implementation does not prevent the file from being deleted by other workers that does not use this method. It relies on the assumption that this blob is used cooperatively solely between workers in this locking logic.

timeout is the wait time for acquiring or releasing the lease. If None, the default value 600 seconds is used. If 0, exactly one attempt is made to acquire a lock.

Note: timeout = None does not mean infinite wait. It means a default wait time. If user wants longer wait, just pass in a large number.

open(mode='r', **kwargs)[source]#

Use this on a blob (not a “directory”) as a context manager. See Google documentation.

read_meta() dict[str, str][source]#
write_meta(data: dict[str, str])[source]#

Serializers#

class upathlib.serializer.Serializer[source]#

Bases: Protocol

classmethod serialize(x: T, **kwargs) bytes[source]#
classmethod deserialize(y: bytes, **kwargs) T[source]#
classmethod dump(x: T, file, *, overwrite: bool = False, **kwargs) None[source]#
classmethod load(file, **kwargs) T[source]#
__init__(*args, **kwargs)#
class upathlib.serializer.JsonSerializer[source]#

Bases: Serializer

classmethod serialize(x, *, encoding=None, errors=None, **kwargs) bytes[source]#
classmethod deserialize(y, *, encoding=None, errors=None, **kwargs)[source]#
class upathlib.serializer.PickleSerializer[source]#

Bases: Serializer

classmethod serialize(x, *, protocol=None, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#
class upathlib.serializer.ZPickleSerializer[source]#

Bases: PickleSerializer

classmethod serialize(x, *, level=3, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#
class upathlib.serializer.ZstdCompressor[source]#

Bases: _local

__init__()[source]#
compress(x, *, level=3, threads=0)[source]#
Parameters:
threads

Number of threads to use to compress data concurrently. When set, compression operations are performed on multiple threads. The default value (0) disables multi-threaded compression. A value of -1 means to set the number of threads to the number of detected logical CPUs.

decompress(y)[source]#
class upathlib.serializer.ZstdPickleSerializer[source]#

Bases: PickleSerializer

classmethod serialize(x, *, level=3, threads=0, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#

Using upathlib to implement a “multiplexer”#

Multiplexer is a utility for distributing data elements to multiple concurrent or distributed workers. Its implementation relies on the “locking” capability of Upath.

Suppose we perform some brute-force search on a cluster of machines; there are 1000 grids, and the algorithm takes on one grid at a time. Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values. We want to distribute these 1000 values to the workers. This is the kind of use cases targeted by Multiplexer.

Let’s show its usage using local data and multiprocessing. (For real work, we would use cloud storage and a cluster of machines.) First, create a Multiplexer to hold the values to be distributed:

>>> from upathlib import LocalUpath
>>> from upathlib.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20

Next, design an interesting worker function:

>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
...     for x in Multiplexer(mux_id):
...         time.sleep(random.uniform(0.1, 0.2))  # doing a lot of things
...         print(x, 'done in', multiprocessing.current_process().name)

Back in the main process,

>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
...     t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
...     t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>
class upathlib.multiplexer.Multiplexer[source]#

Bases: Iterable[Element], Sized

Multiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.

Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.

The usage consists of two main parts:

1. In “coordinator” code, call create_read_session() to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.

Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls new() to create a new Multiplexer, then calls create_read_session() on it, and then manages to send the ID to workers.

2. In “worker” code, use the ID that was returned by create_read_session() to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.

As far as this utility is concerned, the coordinator does not need to wait for the workers to finish consuming the data. The coordinator process or machine may quit and let the workers continue with their job. Usually you should take care of job tracking separately.

See create_read_session() for more details.

classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
Parameters:
data

The data elements that need to be distributed. The elements should be pickle-able.

Importantly, data (the dataset) is meant to contain a modest number (say, up to thousands) of “control parameters”, not massive number of raw data elements. This is because distributing each data element by this utility incurs nontrivial overhead. Each data element is meant to trigger a substantial amount of processing in a worker, making the overhead of obtaining the data element worthwhile.

Based on this understanding, the elements in data are simply saved in a single pickle file, and each worker will fetch a copy of this file.

path

A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under path to store data and info about this particular multiplexer. The name of the subdirectory is a datetime string. tag is appended to the sub-directory name to be more informative, if so desired.

If path is in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. If path is on the local disk, then the workers are in threads or processes on the same machine.

However, there are no strong reasons to use this facility on a local machine, because the same functionality can be achieved by a queue-based solution.

Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by upathlib.

Since path is a “root directory” hosting Multiplexers (each in a randomly named sub-directory), a subclass may choose to fix this directory so that new() does away with this parameter.

__init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#

Create a Multiplexer object and use it to distribute the data elements that have been stored by new().

Parameters:
mux_id

The value that is returned by create_read_session().

worker_id

A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.

property worker_id: str#
__len__() int[source]#

Return the number of data elements stored in this Multiplexer.

create_read_session() str[source]#

Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new Multiplexer and calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:

mux = Multiplexer.new(range(1000), '/tmp/abc/mux/')
mux_id = mux.create_read_session()

The mux_id is then provided to the workers, which will create Multiplexer instances pointing to the same dataset and participating in the reading session that has just been started:

mux = Multiplexer(mux_id)
for x in mux:
    ...

The data that was provided to new() is split between the workers so that each data element will be obtained by exactly one worker.

The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.

This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new Multiplexer object to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.

As a rule of thumb, an object created by Multiplexer.new(data, ...) is not a participant of any read session (even after create_read_session() is called on it subsequently). On the other hand, an object created by Multiplexer(mux_id, ...) is participating in the read session that is identified by mux_id.

__iter__() Iterator[Element][source]#

Iterates over the data contained in the Multiplexer.

stat(mux_id: str | None = None) dict[source]#

Return status info of an ongoing read session.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

done(mux_id: str | None = None) bool[source]#

Return whether the data iteration is finished.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

destroy() None[source]#

Delete all the data stored by this Multiplexer, hence reclaiming the storage space.

Using upathlib to implement a manager of versioned datasets with backup in the cloud#

VersionedUploadable helps store and use a “dataset” in an exclusive directory in consistent and convenient ways. Specifically,

  1. The dataset is identified by a version string that is generated and sortable (datetime-based), so that the “newest” is always the “latest” version, and code can infer the latest version. The full path of the storage location is managed for the user, who only needs the version.

  2. The storage can be either local (on disk) or remote (in a cloud blob store). There are methods to download/upload between local and remote storages.

    However, “local” and “remote” are just labels for the two storage locations. They can be both on local disk, or both in the same cloud storage (in different “locations”), or in two different cloud storages, or one on local disk and the other in a cloud blob store. This is controlled by the classmethods VersionedUploadable.local_cls_upath() and VersionedUploadable.remote_cls_upath(), which should be implemented by subclasses.

  3. Within the dataset, one can conveniently specify sub-directories and files relative to the “root”, and read/write. This navigation is the same regardless of whether the storage is local or remote.

class upathlib.versioned_uploadable.VersionedUploadable[source]#

Bases: ABC

A subclass will customize remote_cls_upath() and local_cls_upath().

classmethod resolve_version(version: str, remote: bool | None = None) tuple[str, bool][source]#

Given version as one of the special values—‘latest-local’, ‘latest-remote’, and ‘latest’—or an actual version string, and remote as None or explicit True/False, figure out the actual version and its remote-ness.

This is called by __init__().

Parameters:
version

Either one of the special values ‘latest’, ‘latest-local’, and ‘latest-remote’, or an actual version string like ‘20210322-120529’.

If version is an actual version string, then version and remote are returned as is, even if remote is None. It is checked that version is a valid version string, but existence of the version is not checked.

remote

If True, look in remote (cloud) storage only. If False, look in local storage only. If None, look in both remote and local.

If version is ‘latest-local’, then remote must be False or None.

If version is ‘latest-remote’, then remote must be True or None.

If version is ‘latest’, then find the latest between local and remote storages if remote is None, otherwise version becomes ‘latest-remote’ or ‘latest-local’ according to the value of remote.

Returns:
tuple

A tuple of two elements: the actual version string, and remote-ness.

Raises ValueError if the parameters are incompatible.

Raises VersionNotFoundError if no version is found that satisfies the request.

classmethod parse_version(version: str) dict[str, str][source]#
abstract classmethod local_cls_upath() LocalUpath[source]#

A subclass implements this method to determine the full path on the local disk for the entity represented by the particular subclass, i.e. a particular type of “dataset”.

The file-system structure directly under this path is determined by this class. Currently it contains a subdirectory called ‘versions’, in which goes on subdirectory per version, named after the version string.

In the directory of one particular version, the content is determined by the user. User can create whatever subdirectories and files they want. This class uses one meta file in the root of the version’s directory and the file is named “info.json”.

abstract classmethod remote_cls_upath() BlobUpath[source]#

Analogous to local_cls_upath() but on the remote side.

classmethod local_version_upath(version: str) LocalUpath[source]#

Root directory of the specified version in the local storage.

classmethod remote_version_upath(version: str) BlobUpath[source]#

Root directory of the specified version in the remote storage.

classmethod local_versions() list[str][source]#

Get a (potentially empty) list of the versions that exist on the local disk.

The elements in the list are sorted from small (old) to large (new).

Because remote_versions and local_versions get “directories” v/o checking their content, they might get invalid (corrupt or empty) versions. User should delete such bad versions as they are discovered.

classmethod remote_versions() list[str][source]#

Analogous to local_versions() but on the remote side.

classmethod has_local_version(version: str) bool[source]#

A version is considered existent if and only if the file “info.json” exists in its root directory.

classmethod has_remote_version(version: str) bool[source]#

Analogous to has_local_version() but on the remote side.

classmethod remove_local_version(version: str, **kwargs) None[source]#

Delete the entire directory of the specified version on the local disk.

By default, there is neither warning before the deletion nor progress printouts.

Parameters:
version

The exact version string. If the version does not exist, it’s a no-op.

**kwargs

Passed on to remove_dir().

classmethod remove_remote_version(version: str, **kwargs) None[source]#

Analogous to remove_local_version() but on the remote side.

classmethod new(*, tag: str | None = None, remote: bool = False, **kwargs) VersionedUploadable[source]#

If a subclass needs additional setup on a newly created object, they may choose to override this classmethod new.

The optional tag appends (human readable) info to the auto created version string, which is based on current date and time.

The returned object has attribute info, which is an empty dict. Nothing has been written to storage.

__init__(version: str, *, remote: bool | None = None, require_exists: bool = True)[source]#

This loads up an existing version for reading and writing. The create a new version, use the classmethod new().

Parameters:
version

Either the actual version string, or one of ‘latest’, ‘latest-local’, and ‘latest-remote’.

remote

Look for the version in local or remote storage?

If an explicit bool, it must be compatible with version. For example, version='latest-remote' and remote=False are not compatible.

If None, and version='latest', then the latest version between local and remote is found and used. If local and remote have the same latest version, then the local one is used.

If None, and version is an exact version, then find it either locally or remotely wherever it exists. If the version exists in both storages, then the local one is used.

require_exists

Default is True. If version is an exact version string but the version does not exist, VersionNotFoundError is raised. Usually you should leave this at the default. This is mainly for the call of __init__ in new(), where it needs to use require_exists=False.

version: str#

version of the object

remote: bool#

remote-ness of the object

property upath: Upath#

Return the root directory of self.

This is consistent with the remote-ness of self.

path(*args: str) Upath[source]#

Return a path relative to the root directory of self.

Examples:

self.path() # the root path self.path(‘info.json’) # file in root directory self.path(‘abc’, ‘de’, ‘data.parquet’) # file ‘abc/de/data.parquet’ under root directory self.path(‘abc/de/data.parquet’) # same as above

Typically, you proceed to read or write with the returned path (object), e.g.,

self.path('info.json').write_json(self.info, overwrite=True)
info = self.path('info.json').read_json()

This is consistent with the remote-ness of self. In other words, if self is local, then the returned path is local (under the local root directory); otherwise, the returned path is remote.

self.path('abc.txt') is equivalent to (self.upath / 'abc.txt').

Note

Don’t start args with '/'.

save() None[source]#

A subclass should re-implement this method to save its own stuff like data, summary, and whatever, and in the end call super().save().

download(path: str | None = None, *, overwrite: bool = False, **kwargs) int[source]#

Download the entire dataset or specified parts of it.

If the current object already points to a local version, then UnsupportedOperation is raised.

If you know certain files have changed, you can bring remote/local into sync by downloading/uploading those particular files.

Parameters:
path

Specific subdirectory or file to download. If None, the entire version is downloaded.

If None, and the local version already exists, and overwrite is False, then download will not happen. However, if the local version is incomplete or corrupt compared to the remote counterpart (the same version), the code wouldn’t know.

If not None, then the specified sub-directory or file will be downloaded (into the expected locations for the version). This is meant for “repair work” if you know certain parts of the local version are corrupt or missing. If the version does not exist locally, there is hardly a scenario for downloading only parts of it (and that may cause issues later, as you are creating an incomplete local version).

overwrite

If True, overwrite any file that exists locally.

**kwargs

If path is None, this is passed on to upathlib.Upath.download_dir. If path is not None, this is ignored.

Returns:
int

The number of files downloaded.

Warning

You should not use overwrite=True lightly just to ensure it proceeds. The default overwrite=False prevents re-downloading when the local version already exists. Try to benefit from such savings as far as you can.

upload(path: str | None = None, *, overwrite: bool = False, **kwargs) int[source]#

Analogous to download.

Return the number of files uploaded.

ensure_local(*, init_kwargs: dict[str, Any] | None = None, **kwargs) VersionedUploadable[source]#

Return a local object of this version that exists.

If self is local, then self is returned.

Otherwise, if the local version does not exist, it will be downloaded. If the local version exists, downloading will not happen. (This code has to assume the local version is sound. though.) To force downloading regardless, pass in overwrite=True, but don’t do that lightly!

Parameters:
init_kwargs

For special needs of a subclass that defines additional arguments for its __init__.

**kwargs

Passed on to download.

.. note:: Calling ``ensure_local`` does not make the current object local;

you need to receive and use the returned object, which is local.

ensure_remote(*, init_kwargs: dict[str, Any] | None = None, **kwargs) VersionedUploadable[source]#

Indices#