upathlib#

(Generated on Feb 08, 2024 for upathlib version 0.9.4.)

The package upathlib defines a unified API for cloud blob store (aka “object store”) as well as local file systems.

Attention is focused on identifying the most essential functionalities while working with a blob store for data processing. Functionalities in a traditional local file system that are secondary in these tasks—such as symbolic links, fine-grained permissions, and various access modes—are ignored.

End user should look to the class Upath for documentation of the API. Local file system is implemented by LocalUpath, which subclasses Upath. Client for Google Cloud Storage (i.e. blob store on GCP) is implemented by another Upath subclass, namely GcsBlobUpath.

One use case is the package biglist, where the class Biglist takes a Upath object to indicate its location of storage. It does not care whether the storage is local or in a cloud blob store—it simply uses the common API to operate the storage.

To install, do one of the following:

$ pip3 install upathlib
$ pip3 install upathlib[gcs]

Quickstart#

Let’s carve out a space in the local file system and poke around.

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/abc')

This creates a LocalUpath object p that points to the location '/tmp/abc'. This may be an existing file, or directory, or may be nonexistent. We know this is a temporary location; to be sure we have a clear playground, let’s wipe out anything and everything:

>>> p.rmrf()
0

Think rm -rf /tmp/abc. It does just that. The returned 0 means zero files were deleted.

Now let’s create a file and write something to it:

>>> (p / 'x.txt').write_text('first')

This creates file /tmp/abc/x.txt with the content 'first'. Note the directory '/tmp/abc' did not exist before the call. We did not need to “create the parent directory”. In fact, upathlib does not provide a way to do that. In upathlib, “directory” is a “virtual” thing that is embodied by a group of files. For example, if there exist

/tmp/abc/x.txt
/tmp/abc/d/y.data

we say there is directories '/tmp/abc' and '/tmp/abc/d', but we don’t create these “directories” by themselves. These directories come into being if there exist such files.

Let’s actually create these files:

>>> (p / 'x.txt').write_text('second', overwrite=True)
>>> (p / 'd' / 'y.data').write_bytes(b'0101')

Now let’s look into this directory:

>>> p.is_dir()
True
>>> (p / 'd').is_dir()
True
>>> (p / 'x.txt').is_dir()
False
>>> (p / 'x.txt').is_file()
True

We can navigate in the directory. For example,

>>> for v in sorted(p.iterdir()):  # the sort merely makes the result stable
...     print(v)
/tmp/abc/d
/tmp/abc/x.txt

This is only the first level, or “direct children”. We can also use “recursive iterdir” to get all files under the directory, descending into subdirectories recursively:

>>> for v in sorted(p.riterdir()):  # the sort merely makes the result stable
...     print(v)
/tmp/abc/d/y.data
/tmp/abc/x.txt

This time only files are listed. Subdirectories do not show up because, after all, they are not real in upathlib concept.

We can as easily read a file, like

>>> (p / 'x.txt').read_text()
'second'

Several common file formats are provided out of the box, including text, bytes, json, and pickle, as well as compressed versions by zlib and Zstandard.

Let’s do some JSON:

>>> pp = p / 'e/f/g/data.json'
>>> pp.write_json({'name': 'John', 'age': 38})

We know the JSON file is also a text file, so we can treat it as such:

>>> pp.read_text()
'{"name": "John", "age": 38}'

But usually we prefer to get back the Python object directly:

>>> v = pp.read_json()
>>> v
{'name': 'John', 'age': 38}
>>> type(v)
<class 'dict'>

We can go “down” the directory tree using /. Conversely, we can go “up” using parent():

>>> pp.path
PosixPath('/tmp/abc/e/f/g/data.json')
>>> pp.parent
LocalUpath('/tmp/abc/e/f/g')
>>> pp.parent.parent
LocalUpath('/tmp/abc/e/f')
>>> pp.parent.parent.is_dir()
True
>>> pp.parent.parent.is_file()
False

or the terminal-lovers’ ..:

>>> pp
LocalUpath('/tmp/abc/e/f/g/data.json')
>>> pp / '..'
LocalUpath('/tmp/abc/e/f/g')
>>> pp / '..' / '..'
LocalUpath('/tmp/abc/e/f')

Under the hood, / delegates to a call to joinpath():

>>> pp.joinpath('../../o/p/q')
LocalUpath('/tmp/abc/e/f/o/p/q')

Let’s see again what we have:

>>> sorted(p.riterdir())
[LocalUpath('/tmp/abc/d/y.data'), LocalUpath('/tmp/abc/e/f/g/data.json'), LocalUpath('/tmp/abc/x.txt')]

and to get rid of them all:

>>> p.rmrf()
3

A nice thing about upathlib is the “unified” nature across local and cloud storages. Suppose we have set up the environment to use Google Cloud Storage, then we could have started this excercise with

>>> from upathlib import GcsBlobUpath
>>> p = GcsBlobUpath('gs://my-bucket/tmp/abc')

Everything after this would work unchanged. (The printouts would be different at some places, e.g. LocalUpath would be replaced by GcsBlobUpath.)

Upath#

Upath is an abstract base class that defines the APIs and some of the implementation. Subclasses tailor to particular storage systems. Currently there are two production-ready subclasses; they implement Upath for local file systems and Google Cloud Storage, respectively.

The APIs follow the style of the standard library pathlib where appropriate.

class upathlib.Upath[source]#

Bases: ABC

__init__(*pathsegments: str)[source]#

Create a Upath instance. Because Upath is an abstract class, this is always called on a subclass to instantiate a path on the specific storage system.

Subclasses for cloud blob stores may need to add additional parameters representing, e.g., container/bucket name, etc.

Parameters:
*pathsegments

Analogous to the input to pathlib.Path. The first segment may or may not start with '/'. The path constructed with *pathsegments is always “absolute” under a known “root”.

For a local POSIX file system, the root is the usual '/'.

For a local Windows file system, the root is resolved to a particular “drive”.

For Azure blob store, the root is that in a “container”.

For AWS and GCP blob stores, the root is that in a “bucket”.

If missing, the path constructed is the “root”. However, the subclass LocalUpath plugs in the current working directory for a missing *pathsegments.

Note

If one segment starts with '/', it will reset to the “root” and discard all the segments that have come before it. For example, Upath('work', 'projects', '/', 'projects') is the same as Upath('/', 'projects).

Note

The first element of *pathsegments may start with some platform-specific strings. For example, '/' on Linux, 'c://' on Windows, 'gs://' on Google Cloud Storage. Please see subclasses for specifics.

__truediv__(key: str) Self[source]#

This method is invoked by self / key. This calls the method joinpath().

property path: PurePath#

The pathlib.PurePath version of the internal path string.

In the subclass LocalUpath, this property is overriden to return a pathlib.Path, which is a subclass of pathlib.PurePath.

In subclasses for cloud blob stores, this implementation stays in effect.

abstract as_uri() str[source]#

Represent the path as a file URI. See subclasses for platform-dependent specifics.

property name: str#

A string representing the final path component, excluding the drive and root, if any.

This is the name component of self.path. If self.path is '/' (the root), then an empty string is returned. (The name of the root path is empty.)

Examples

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.path
PosixPath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.name
'sales.txt.gz'
>>> p.parent.parent.parent.parent
LocalUpath('/tmp')
>>> p.parent.parent.parent.parent.name
'tmp'
>>> p.parent.parent.parent.parent.parent
LocalUpath('/')
>>> p.parent.parent.parent.parent.parent.name
''
>>> # the parent of root is still root:
>>> p.parent.parent.parent.parent.parent.parent
LocalUpath('/')
property stem: str#

The final path component, without its suffix.

This is the “stem” part of self.name.

Examples

>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p
LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p.path
PosixPath('/tmp/test/upathlib/data/sales.txt')
>>> p.name
'sales.txt'
>>> p.stem
'sales'
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.stem
'sales.txt'
property suffix: str#

The file extension of the final component, if any

property suffixes: list[str]#

A list of the path’s file extensions.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>> p.suffix
'.txt'
>>> p.suffixes
['.txt']
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.suffix
'.gz'
>>> p.suffixes
['.txt', '.gz']
exists() bool[source]#

Return True if the path is an existing file or dir; False otherwise.

Examples

In a blobstore with blobs

/a/b/cd
/a/b/cd/e.txt

'/a/b/cd' exists, and is both a file and a dir; '/a/b/cd/e.txt' exists, and is a file; '/a/b' exists, and is a dir; '/a/b/c' does not exist.

abstract is_dir() bool[source]#

Return True if the path is an existing directory; False otherwise.

If there exists a file named like

/a/b/c/d.txt

we say '/a', '/a/b', '/a/b/c' are existing directories.

In a cloud blob store, there’s no such thing as an “empty directory”, because there is no concept of “directory”. A blob store just consists of files (aka blobs) with names, which could contain the letter ‘/’, with no special meaning attached to it. We interpret the name '/a/b' as a directory to emulate the familiar concept in a local file system when there exist files named '/a/b/*'.

In a local file system, there can be empty directories. However, it is recommended to not have empty directories.

There is no method for “creating an tempty dir” (like the Linux command mkdir). Simply create a file under the dir, and the dir will come into being. This is analogous to we create files all the time—we don’t “create” an empty file in advance; we simply write to the would-be path of the file to be created.

abstract is_file() bool[source]#

Return True if the path is an existing file; False otherwise.

In a cloud blob store, a path can be both a file and a dir. For example, if these blobs exist:

/a/b/c/d.txt
/a/b/c

we say /a/b/c is a “file”, and also a “dir”. User is recommended to avoid such namings.

This situation does not happen in a local file system.

abstract file_info() FileInfo | None[source]#

If is_file() is False, return None; otherwise, return file info.

property parent: Self#

The parent of the path.

If the path is the root, then the parent is still the root.

abstract property root: Self#

Return a new path representing the root.

joinpath(*other: str) Self[source]#

Join this path with more segments, return the new path object.

Calling this method is equivalent to combining the path with each of the other arguments in turn.

If self was created by Upath(*segs), then this method essentially returns Upath(*segs, *other).

If *other is a single string, there is a shortcut by the operator /, implemented by __truediv__().

with_name(name: str) Self[source]#

Return a new path the the “name” part substituted by the new value. If the original path doesn’t have a name (i.e. the original path is the root), ValueError is raised.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>> p.with_name('sales.data')
LocalUpath('/tmp/test/upathlib/data/sales.data')
with_suffix(suffix: str) Self[source]#

Return a new path with the suffix replaced by the specified value. If the original path doesn’t have a suffix, the new suffix is appended instead. If suffix is an empty string, the original suffix is removed.

suffix should include a dot, like '.txt'.

Examples

>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz')
>>>
>>> # replace the last suffix:
>>> p.with_suffix('.data')
LocalUpath('/tmp/test/upathlib/data/sales.txt.data')
>>>
>>> # remove the last suffix:
>>> p.with_suffix('')
LocalUpath('/tmp/test/upathlib/data/sales.txt')
>>>
>>> p.with_suffix('').with_suffix('.bin')
LocalUpath('/tmp/test/upathlib/data/sales.bin')
>>>
>>> pp = p.with_suffix('').with_suffix('')
>>> pp
LocalUpath('/tmp/test/upathlib/data/sales')
>>>
>>> # no suffix to remove:
>>> pp.with_suffix('')
LocalUpath('/tmp/test/upathlib/data/sales')
>>>
>>> # add a suffix:
>>> pp.with_suffix('.pickle')
LocalUpath('/tmp/test/upathlib/data/sales.pickle')
abstract write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None[source]#

Write bytes data to the current file.

Parent “directories” are created as needed, if applicable.

If overwrite is False and the current file exists, FileExistsError is raised.

data is either “byte-like” (such as bytes, bytearray, memoryview) or “file-like” open in “binary” mode. In the second case, the file should be positioned at the beginning (such as by calling .seek(0).)

abstract read_bytes() bytes[source]#

Return the binary contents of the pointed-to file as a bytes object.

If self is not a file or does not exist, FileNotFoundError is raised.

write_text(data: str, *, overwrite: bool = False, encoding: str | None = None, errors: str | None = None) None[source]#

Write text data to the current file.

Parent “directories” are created as needed, if applicable.

If overwrite is False and the current file exists, FileExistsError is raised.

encoding and errors are passed to encode(). Usually you should leave them at the default values.

read_text(*, encoding: str | None = None, errors: str | None = None) str[source]#

Return the decoded contents of the pointed-to file as a string.

If self is not a file or does not exist, FileNotFoundError is raised.

encoding and errors are passed to decode(). Usually you should leave them at the default values.

write_json(data: Any, *, overwrite=False, **kwargs) None[source]#
read_json(**kwargs) Any[source]#
write_pickle(data: Any, *, overwrite=False, **kwargs) None[source]#
read_pickle(**kwargs) Any[source]#
write_pickle_zstd(data: Any, *, overwrite=False, **kwargs) None[source]#
read_pickle_zstd(**kwargs) Any[source]#
copy_dir(target: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int[source]#

Copy the content of the current directory (i.e. self) recursively to target.

If target is an string, then it is in the same store the the current path, and it is either absolute, or relative to self.parent. In this case, the directory created by this operation will be the path self.parent / target.

If target is not a string, then it must be an instance of a Upath() subclass, and it may be in any store system.

Immediate children of self will be copied as immediate children of the target path.

There is no such error as “target directory exists” as the copy-operation only concerns invidivual files. If the target “directory” contains files that do not have counterparts in the source directory, they will stay untouched.

overwrite is file-wise. If False, any existing target file will raise FileExistsError and halt the operation. If True, any existing target file will be overwritten by the source file.

quiet controls whether to print out progress info.

Returns:
int

The number of files copied.

copy_file(target: str | Upath, *, overwrite: bool = False) None[source]#

Copy the current file (i.e. self) to target.

If target is str, then it is in the same store as the current path, and it is either absolute, or relative to self.parent. In this case, the file created by this operation will the path self.parent / target. For example, if self is '/a/b/c/d.txt', then target='e.txt' means '/a/b/c/e.txt'.

If target is not a string, then it must be an instance of a Upath() subclass, and it may be in any storage system.

target is the target file, not a target directory to “copy into”.

If self is not an existing file, FileNotFoundError is raised.

If target is an existing file and overwrite is False, FileExistsError is raised. If overwrite is True, then the file will be overwritten.

If type(self) is LocalUpath and target is an existing directory, then IsADirectoryError is raised. In a cloud blob store, there is no concrete “directory”. For example, suppose self is the path ‘gs://mybucket/experiment/data’ on Google Cloud Storage, and target is ‘/backup/record’, then the target path is ‘gs://mybucket/backup/record’. If there exists blob ‘gs://mybucket/backup/record/y’, then we say ‘gs://mybucket/backup/record’ is a “directory”. However, this is merely a “virtual” concept, or an emulation of the “directory” concept on local disk. As long as this path is not an existing blob, the copy will proceed with no problem. Nevertheless, such naming is confusing and better avoided.

remove_dir(*, quiet: bool = True, concurrent: bool = True) int[source]#

Remove the current directory (i.e. self) and all its contents recursively.

Essentially, this removes each file that is yielded by riterdir(). Subclasses should take care to remove “empty directories”, if applicable, that are left behind.

quiet controls whether to print progress info.

Returns:
int

The number of files removed.

abstract remove_file() None[source]#

Remove the current file (i.e. self).

If self is not an existing file, FileNotFoundError is raised. If the file exists but can’t be removed, the platform-dependent exception is propagated.

abstract iterdir() Iterator[Self][source]#

Yield the immediate (i.e. non-recursive) children of the current dir (i.e. self).

The yieled elements are instances of the same class. Each yielded element is either a file or a dir. There is no guarantee on the order of the returned elements.

If self is not a dir (e.g. maybe it’s a file), or does not exist at all, nothing is yielded (resulting in an empty iterable); no exception is raised.

See also

riterdir().

ls() list[Self][source]#

Return the elements yielded by iterdir() in a sorted list.

Sorting is by a full path string maintained internally.

The returned list may be empty.

abstract riterdir() Iterator[Self][source]#

Yield files under the current dir (i.e. self) recursively. The method name means “recursive iterdir”.

The yieled elements are instances of the same class. They represent existing files.

Compared to iterdir(), this is recursive, and yields files only. Empty subdirectories will have no representation in the return.

Similar to iterdir(), if self is not a dir or does not exist, then nothing is yielded, and no exception is raised either.

There is no guarantee on the order of the returned elements.

rmrf(*, quiet: bool = True, concurrent: bool = False) int[source]#

Remove the current file or dir (i.e. self) recursively.

Analogous to the Linux command rm -rf, hence the name of this method.

Return the number of files removed.

For example, if these blobs are present:

/a/b/c/d/e.txt
/a/b/c/kk.data
/a/b/c

then Upath('/a/b/c').rmrf() would remove all of them.

concurrent is False by default because this method is often used in __del__ of user classes, and thread pool is problematic in __del__.

abstract lock(*, timeout: int = None) Self[source]#

Lock the current file (i.e. self), in order to have exclusive access to the code block that has possesion of the lock.

timeout: if the lock can’t be acquired within timeout seconds, LockAcquireError is raised. If None, wait for a default reasonably long time. To wait “forever”, just pass in a large number.

timeout=0 is a valid input, meaning making exactly one attempt to acquire a lock.

Once a lock is acquired, it will not expire until this contexmanager exits. In other words, this is timeout for the “lock acquisition”, not for the lock itself. Actual waiting time could be slightly longer or shorter.

This is a “mandatory lock”, as opposed to an “advisory lock”. The intended use case is for this lock to be used for implementing a (cooperative) “code lock”.

As this abstract method is to be used as a context manager, a subclass should use yield in its implementation. The yield statement should yield self.

One way to achive cooperative locking on a file via this lock is like this:

f = Upath('abc.txt')
with f.lock():
    ...
    # Now read from or write to `f` with exclusive access.

Depending on the capabilities of the specific storage system, the lock may be on the current file itself or on another, helper file as an implementation detail. In the latter case, this method should delete the helper file upon release of the lock, if possible.

Some storage engines may not provide the capability to implement this lock.

class upathlib.FileInfo[source]#

Bases: object

ctime: float#

Creation time as a POSIX timetamp.

mtime: float#

Last modification time as a POSIX timestamp.

time_created: datetime#

Creation time as an datetime object.

time_modified: datetime#

Last modification time as an datetime object.

size: int#

In bytes.

details: Any#

Platform-dependent.

__init__(ctime: float, mtime: float, time_created: datetime, time_modified: datetime, size: int, details: Any) None#

LocalUpath#

class upathlib.LocalUpath[source]#

Bases: Upath, PathLike

__init__(*pathsegments: str)[source]#

Create a path on the local file system. Both POSIX and Windows platforms are supported.

*pathsegments specify the path, either absolute or relative to the current working directory. If missing, the constructed path is the current working directory. This is passed to pathlib.Path.

__fspath__() str[source]#

LocalUpath implements the os.PathLike protocol, hence a LocalUpath object can be used anywhere an object implementing os.PathLike is accepted. For example, used with the builtin function open():

>>> p = LocalUpath('/tmp/test/data.txt')
>>> p.rmrf()
0
>>> p.write_text('abc')
>>> with open(p) as file:
...     print(file.read())
abc
>>> p.rmrf()
1
property path: Path#

Return the pathlib.Path object of the path.

as_uri() str[source]#

Represent the path as a file URI. On Linux, this is like ‘file:///home/username/path/to/file’. On Windows, this is like ‘file:///C:/Users/username/path/to/file’.

is_dir() bool[source]#

Return whether the current path is a dir.

is_file() bool[source]#

Return whether the current path is a file.

file_info() FileInfo | None[source]#

Return file info if the current path is a file; otherwise return None.

property root: LocalUpath#

Return a new path representing the root.

On Windows, this is the root on the same drive, like LocalUpath('C:'). On Linux and Mac, this is LocalUpath('/').

read_bytes() bytes[source]#

Read the content of the current file as bytes.

write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None[source]#

Write the bytes data to the current file.

remove_dir(**kwargs) int[source]#

Remove the current dir along with all its contents recursively.

remove_file() None[source]#

Remove the current file.

rename_dir(target: str | LocalUpath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) LocalUpath[source]#

Rename the current dir (i.e. self) to target.

overwrite is applied file-wise. If there are files under target that do not have counterparts under self, they are left untouched.

quiet controls whether to print progress info.

Return the new path.

rename_file(target: str | LocalUpath, *, overwrite: bool = False) LocalUpath[source]#

Rename the current file (i.e. self) to target in the same store.

target is either absolute or relative to self.parent. For example, if self is ‘/a/b/c/d.txt’, then target='e.txt' means ‘/a/b/c/e.txt’.

If overwrite is False (the default) and the target file exists, FileExistsError is raised.

Return the new path.

iterdir() Iterator[LocalUpath][source]#

Yield the immediate children under the current dir.

riterdir() Iterator[LocalUpath][source]#

Yield all files under the current dir recursively.

lock(*, timeout=None)[source]#

This uses the package filelock to implement a file lock for inter-process communication.

Note

At the end, this file is not deleted. If it is purely a dummy file to implement locking for other things, user may want to delete this file after use.

BlobUpath#

class upathlib._blob.BlobUpath[source]#

Bases: Upath

BlobUpath is a base class for paths in a cloud storage, aka “blob store”. This is in contrast to a local disk storage, which is implemnted by LocalUpath.

property blob_name: str#

Return the “name” of the blob. This is the “path” without a leading '/'. In cloud blob stores, this is exactly the name of the blob. The name often contains '/', which has no special role in the name per se but is interpreted by users to be a directory separator.

is_dir() bool[source]#

In a typical blob store, there is no such concept as a “directory”. Here we emulate the concept in a local file system. If there is a blob named like

/ab/cd/ef/g.txt

we say there exists directory “/ab/cd/ef”. We should never have a trailing / in a blob’s name, like

/ab/cd/ef/

(I don’t know whether the blob stores allow such blob names.)

Consequently, is_dir is equivalent to “having stuff in the dir”. There is no such thing as an “empty directory” in blob stores.

iterdir() Iterator[Self][source]#

Yield immediate children under the current dir.

This is a naive, inefficient implementation. Expected to be refined by subclasses.

download_dir(target: str | Path | LocalUpath, *, overwrite: bool = False, quiet: bool = False, **kwargs) int[source]#

A specialization of copy_dir() where the target location is on the local disk.

download_file(target: str | Path | LocalUpath, *, overwrite=False) None[source]#

A specialization of copy_file() where the target location is on the local disk.

Subclass is expected to override with a more efficient implementation.

upload_dir(source: str | Path | LocalUpath, *, overwrite: bool = False, quiet: bool = False, **kwargs) int[source]#

A specialization of copy_dir() for LocalUpath where the target location is in a cloud blob store.

upload_file(source: str | Path | LocalUpath, *, overwrite=False) None[source]#

A specialization of copy_file() for LocalUpath where the target location is in a cloud blob store.

Subclass is expected to override with a more efficient implementation.

GcsBlobUpath#

class upathlib.GcsBlobUpath[source]#

Bases: BlobUpath

GcsBlobUpath implements the Upath API for Google Cloud Storage using the package google-cloud-storage.

__init__(*paths: str, bucket_name: str | None = None)[source]#

If bucket_name is None, then *paths should be a single string starting with ‘gs://<bucket-name>/’.

If bucket_name is specified, then *paths specify path components under the root of the bucket.

Examples

These several calls are equivalent:

>>> GcsBlobUpath('experiments', 'data', 'first.data', bucket_name='backup')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('/experiments/data/first.data', bucket_name='backup')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('gs://backup/experiments/data/first.data')
GcsBlobUpath('gs://backup/experiments/data/first.data')
>>> GcsBlobUpath('gs://backup', 'experiments', 'data/first.data')
GcsBlobUpath('gs://backup/experiments/data/first.data')
as_uri() str[source]#

Represent the path as a file URI, like ‘gs://bucket-name/path/to/blob’.

is_file() bool[source]#

The result of this call is not cached, in case the object is modified anytime by other clients.

is_dir() bool[source]#

If there is a dummy blob with name f"{self.name}/", this will return True. This is the case after creating a “folder” on the GCP dashboard. In programatic use, it’s recommended to avoid such situations so that is_dir() returns True if and only if there are blobs “under” the current path.

file_info(*, timeout=None) FileInfo | None[source]#

Return file info if the current path is a file; otherwise return None.

property root: GcsBlobUpath#

Return a new path representing the root of the same bucket.

write_bytes(data: bytes | BufferedReader, *, overwrite=False, **kwargs)[source]#

Write bytes data to the current blob.

In the usual case, data is bytes. The case where data is a io.BufferedReader object, such as an open file, is not well tested.

read_bytes(**kwargs) bytes[source]#

Return the content of the current blob as bytes.

download_file(target: str | Path | LocalUpath, *, overwrite=False, concurrent=True) None[source]#

Download the content of the current blob to target.

upload_file(source: str | Path | LocalUpath, *, overwrite=False) None[source]#

Upload the content of source to the current blob.

iterdir() Iterator[Self][source]#

Yield immediate children under the current dir.

remove_dir(**kwargs) int[source]#

Remove the current dir and all the content under it recursively. Return the number of blobs removed.

remove_file() None[source]#

Remove the current blob.

riterdir() Iterator[Self][source]#

Yield all blobs recursively under the current dir.

lock(*, timeout=None)[source]#

This implementation does not prevent the file from being deleted by other workers that does not use this method. It relies on the assumption that this blob is used cooperatively solely between workers in this locking logic.

timeout is the wait time for acquiring or releasing the lease. If None, the default value 600 seconds is used. If 0, exactly one attempt is made to acquire a lock.

Note: timeout = None does not mean infinite wait. It means a default wait time. If user wants longer wait, just pass in a large number.

open(mode='r', **kwargs)[source]#

Use this on a blob (not a “directory”) as a context manager. See Google documentation.

read_meta() dict[str, str][source]#
write_meta(data: dict[str, str], retry: Retry | None = None)[source]#

Serializers#

class upathlib.serializer.Serializer[source]#

Bases: Protocol

classmethod serialize(x: T, **kwargs) bytes[source]#
classmethod deserialize(y: bytes, **kwargs) T[source]#
classmethod dump(x: T, file, *, overwrite: bool = False, **kwargs) None[source]#
classmethod load(file, **kwargs) T[source]#
__init__(*args, **kwargs)#
class upathlib.serializer.JsonSerializer[source]#

Bases: Serializer

classmethod serialize(x, *, encoding=None, errors=None, **kwargs) bytes[source]#
classmethod deserialize(y, *, encoding=None, errors=None, **kwargs)[source]#
class upathlib.serializer.PickleSerializer[source]#

Bases: Serializer

classmethod serialize(x, *, protocol=None, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#
class upathlib.serializer.ZPickleSerializer[source]#

Bases: PickleSerializer

classmethod serialize(x, *, level=3, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#
class upathlib.serializer.ZstdCompressor[source]#

Bases: _local

__init__()[source]#
compress(x, *, level=3, threads=0)[source]#
Parameters:
threads

Number of threads to use to compress data concurrently. When set, compression operations are performed on multiple threads. The default value (0) disables multi-threaded compression. A value of -1 means to set the number of threads to the number of detected logical CPUs.

decompress(y)[source]#
class upathlib.serializer.ZstdPickleSerializer[source]#

Bases: PickleSerializer

classmethod serialize(x, *, level=3, threads=0, **kwargs) bytes[source]#
classmethod deserialize(y, **kwargs)[source]#

Using upathlib to implement a “multiplexer”#

Multiplexer is a utility for distributing data elements to multiple concurrent or distributed workers. Its implementation relies on the “locking” capability of Upath.

Suppose we perform some brute-force search on a cluster of machines; there are 1000 grids, and the algorithm takes on one grid at a time. Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values. We want to distribute these 1000 values to the workers. This is the kind of use cases targeted by Multiplexer.

Let’s show its usage using local data and multiprocessing. (For real work, we would use cloud storage and a cluster of machines.) First, create a Multiplexer to hold the values to be distributed:

>>> from upathlib import LocalUpath
>>> from upathlib.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20

Next, design an interesting worker function:

>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
...     for x in Multiplexer(mux_id):
...         time.sleep(random.uniform(0.1, 0.2))  # doing a lot of things
...         print(x, 'done in', multiprocessing.current_process().name)

Back in the main process,

>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
...     t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
...     t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>
class upathlib.multiplexer.Multiplexer[source]#

Bases: Iterable[Element], Sized

Multiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.

Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.

The usage consists of two main parts:

1. In “coordinator” code, call create_read_session() to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.

Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls new() to create a new Multiplexer, then calls create_read_session() on it, and then manages to send the ID to workers.

2. In “worker” code, use the ID that was returned by create_read_session() to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.

classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
Parameters:
data

The data elements that need to be distributed. The elements should be pickle-able.

path

A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under path to storage data and info about this particular multiplexer. The name of the subdirectory is a datetime string. tag is appended to the sub-directory name to be more informative, if so desired.

If path is in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. If path is on the local disk, then the workers are in threads or processes on the same machine.

However, there are no strong reasons to use this facility on a local machine.

Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by upathlib.

__init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#

Create a Multiplexer object and use it to distribute the data elements that have been stored by new().

Parameters:
mux_id

The value that is returned by create_read_session().

worker_id

A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.

property worker_id: str#
__len__() int[source]#

Return the number of data elements stored in this Multiplexer.

create_read_session() str[source]#

Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new Multiplexer and calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:

mux = Multiplexer.new(range(1000), '/tmp/abc/mux/')
mux_id = mux.create_read_session()

The mux_id is then provided to the workers, which will create Multiplexer instances pointing to the same dataset and participating in the reading session that has just been started:

mux = Multiplexer(mux_id)
for x in mux:
    ...

The data that was provided to new() is split between the workers so that each data element will be obtained by exactly one worker.

The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.

This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new Multiplexer object to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.

As a rule of thumb, an object created by Multiplexer.new(data, ...) is not a participant of any read session (even after create_read_session() is called on it subsequently). On the other hand, an object created by Multiplexer(mux_id, ...) is participating in the read session that is identified by mux_id.

__iter__() Iterator[Element][source]#

Iterates over the data contained in the Multiplexer.

stat(mux_id: str | None = None) dict[source]#

Return status info of an ongoing read session.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

done(mux_id: str | None = None) bool[source]#

Return whether the data iteration is finished.

This is often called in the “coordinator” code on the object that has had its create_read_session() called. mux_id is the return of create_read_session(). If mux_id is None, then this method is about the read session in which the current object is participating.

destroy() None[source]#

Delete all the data stored by this Multiplexer, hence reclaiming the storage space.

Indices#