upathlib#
(Generated on Nov 19, 2024 for upathlib version 0.9.8.)
The package upathlib defines a unified API for cloud blob store (aka “object store”) as well as local file systems.
Attention is focused on identifying the most essential functionalities while working with a blob store for data processing. Functionalities in a traditional local file system that are secondary in these tasks—such as symbolic links, fine-grained permissions, and various access modes—are ignored.
End user should look to the class Upath for documentation of the API.
Local file system is implemented by LocalUpath, which subclasses Upath.
Client for Google Cloud Storage (i.e. blob store on GCP) is implemented by another Upath subclass,
namely GcsBlobUpath.
One use case is the package biglist, where the class Biglist takes a Upath object to indicate its location of storage. It does not care whether the storage is local or in a cloud blob store—it simply uses the common API to operate the storage.
To install, do one of the following:
$ pip3 install upathlib
$ pip3 install upathlib[gcs]
Quickstart#
Let’s carve out a space in the local file system and poke around.
>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/abc')
This creates a LocalUpath object p that points to the location
'/tmp/abc'. This may be an existing file, or directory, or may be nonexistent.
We know this is a temporary location; to be sure we have a clear playground, let’s
wipe out anything and everything:
>>> p.rmrf()
0
Think rm -rf /tmp/abc. It does just that. The returned 0 means zero files were deleted.
Now let’s create a file and write something to it:
>>> (p / 'x.txt').write_text('first')
This creates file /tmp/abc/x.txt with the content 'first'. Note the directory '/tmp/abc'
did not exist before the call. We did not need to “create the parent directory”.
In fact, upathlib does not provide a way to do that.
In upathlib, “directory” is a “virtual” thing that is embodied by a group of files.
For example, if there exist
/tmp/abc/x.txt
/tmp/abc/d/y.data
we say there is directories '/tmp/abc' and '/tmp/abc/d', but we
don’t create these “directories” by themselves. These directories come into being
if there exist such files.
Let’s actually create these files:
>>> (p / 'x.txt').write_text('second', overwrite=True)
>>> (p / 'd' / 'y.data').write_bytes(b'0101')
Now let’s look into this directory:
>>> p.is_dir()
True
>>> (p / 'd').is_dir()
True
>>> (p / 'x.txt').is_dir()
False
>>> (p / 'x.txt').is_file()
True
We can navigate in the directory. For example,
>>> for v in sorted(p.iterdir()): # the sort merely makes the result stable
... print(v)
/tmp/abc/d
/tmp/abc/x.txt
This is only the first level, or “direct children”. We can also use “recursive iterdir” to get all files under the directory, descending into subdirectories recursively:
>>> for v in sorted(p.riterdir()): # the sort merely makes the result stable
... print(v)
/tmp/abc/d/y.data
/tmp/abc/x.txt
This time only files are listed. Subdirectories do not show up because,
after all, they are not real in upathlib concept.
We can as easily read a file, like
>>> (p / 'x.txt').read_text()
'second'
Several common file formats are provided out of the box, including text, bytes, json, and pickle, as well as compressed versions by zlib and Zstandard.
Let’s do some JSON:
>>> pp = p / 'e/f/g/data.json'
>>> pp.write_json({'name': 'John', 'age': 38})
We know the JSON file is also a text file, so we can treat it as such:
>>> pp.read_text()
'{"name": "John", "age": 38}'
But usually we prefer to get back the Python object directly:
>>> v = pp.read_json()
>>> v
{'name': 'John', 'age': 38}
>>> type(v)
<class 'dict'>
We can go “down” the directory tree using /.
Conversely, we can go “up” using parent():
>>> pp.path
PosixPath('/tmp/abc/e/f/g/data.json')
>>> pp.parent
LocalUpath('/tmp/abc/e/f/g')
>>> pp.parent.parent
LocalUpath('/tmp/abc/e/f')
>>> pp.parent.parent.is_dir()
True
>>> pp.parent.parent.is_file()
False
or the terminal-lovers’ ..:
>>> pp
LocalUpath('/tmp/abc/e/f/g/data.json')
>>> pp / '..'
LocalUpath('/tmp/abc/e/f/g')
>>> pp / '..' / '..'
LocalUpath('/tmp/abc/e/f')
Under the hood, / delegates to a call to joinpath():
>>> pp.joinpath('../../o/p/q')
LocalUpath('/tmp/abc/e/f/o/p/q')
Let’s see again what we have:
>>> sorted(p.riterdir())
[LocalUpath('/tmp/abc/d/y.data'), LocalUpath('/tmp/abc/e/f/g/data.json'), LocalUpath('/tmp/abc/x.txt')]
and to get rid of them all:
>>> p.rmrf()
3
A nice thing about upathlib is the “unified” nature across local and cloud storages.
Suppose we have set up the environment to use Google Cloud Storage, then we could have started this excercise with
>>> from upathlib import GcsBlobUpath
>>> p = GcsBlobUpath('gs://my-bucket/tmp/abc')
Everything after this would work unchanged. (The printouts would be different at some places,
e.g. LocalUpath would be replaced by GcsBlobUpath.)
Upath#
Upath is an abstract base class that defines the APIs and some of the implementation.
Subclasses tailor to particular storage systems.
Currently there are two production-ready subclasses; they implement Upath
for local file systems and Google Cloud Storage, respectively.
The APIs follow the style of the standard library pathlib where appropriate.
- class upathlib.Upath[source]#
Bases:
ABC- __init__(*pathsegments: str)[source]#
Create a
Upathinstance. BecauseUpathis an abstract class, this is always called on a subclass to instantiate a path on the specific storage system.Subclasses for cloud blob stores may need to add additional parameters representing, e.g., container/bucket name, etc.
- Parameters:
- *pathsegments
Analogous to the input to pathlib.Path. The first segment may or may not start with
'/'. The path constructed with*pathsegmentsis always “absolute” under a known “root”.For a local POSIX file system, the root is the usual
'/'.For a local Windows file system, the root is resolved to a particular “drive”.
For Azure blob store, the root is that in a “container”.
For AWS and GCP blob stores, the root is that in a “bucket”.
If missing, the path constructed is the “root”. However, the subclass
LocalUpathplugs in the current working directory for a missing*pathsegments.Note
If one segment starts with
'/', it will reset to the “root” and discard all the segments that have come before it. For example,Upath('work', 'projects', '/', 'projects')is the same asUpath('/', 'projects).Note
The first element of
*pathsegmentsmay start with some platform-specific strings. For example,'/'on Linux,'c://'on Windows,'gs://'on Google Cloud Storage. Please see subclasses for specifics.
- __truediv__(key: str) Self[source]#
This method is invoked by
self / key. This calls the methodjoinpath().
- property path: PurePath#
The pathlib.PurePath version of the internal path string.
In the subclass
LocalUpath, this property is overridden to return a pathlib.Path, which is a subclass of pathlib.PurePath.In subclasses for cloud blob stores, this implementation stays in effect.
- abstract as_uri() str[source]#
Represent the path as a file URI. See subclasses for platform-dependent specifics.
- property name: str#
A string representing the final path component, excluding the drive and root, if any.
This is the
namecomponent ofself.path. Ifself.pathis'/'(the root), then an empty string is returned. (The name of the root path is empty.)Examples
>>> from upathlib import LocalUpath >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.path PosixPath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.name 'sales.txt.gz' >>> p.parent.parent.parent.parent LocalUpath('/tmp') >>> p.parent.parent.parent.parent.name 'tmp' >>> p.parent.parent.parent.parent.parent LocalUpath('/') >>> p.parent.parent.parent.parent.parent.name '' >>> # the parent of root is still root: >>> p.parent.parent.parent.parent.parent.parent LocalUpath('/')
- property stem: str#
The final path component, without its suffix.
This is the “stem” part of
self.name.Examples
>>> from upathlib import LocalUpath >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p.path PosixPath('/tmp/test/upathlib/data/sales.txt') >>> p.name 'sales.txt' >>> p.stem 'sales' >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.stem 'sales.txt'
- property suffix: str#
The file extension of the final component, if any
- property suffixes: list[str]#
A list of the path’s file extensions.
Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p.suffix '.txt' >>> p.suffixes ['.txt'] >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.suffix '.gz' >>> p.suffixes ['.txt', '.gz']
- exists() bool[source]#
Return
Trueif the path is an existing file or dir;Falseotherwise.Examples
In a blob store with blobs
/a/b/cd /a/b/cd/e.txt
'/a/b/cd'exists, and is both a file and a dir;'/a/b/cd/e.txt'exists, and is a file;'/a/b'exists, and is a dir;'/a/b/c'does not exist.
- abstract is_dir() bool[source]#
Return
Trueif the path is an existing directory;Falseotherwise.If there exists a file named like
/a/b/c/d.txt
we say
'/a','/a/b','/a/b/c'are existing directories.In a cloud blob store, there’s no such thing as an “empty directory”, because there is no concept of “directory”. A blob store just consists of files (aka blobs) with names, which could contain the letter ‘/’, with no special meaning attached to it. We interpret the name
'/a/b'as a directory to emulate the familiar concept in a local file system when there exist files named'/a/b/*'.In a local file system, there can be empty directories. However, it is recommended to not have empty directories.
There is no method for “creating an empty dir” (like the Linux command
mkdir). Simply create a file under the dir, and the dir will come into being. This is analogous to we create files all the time—we don’t “create” an empty file in advance; we simply write to the would-be path of the file to be created.
- abstract is_file() bool[source]#
Return
Trueif the path is an existing file;Falseotherwise.In a cloud blob store, a path can be both a file and a dir. For example, if these blobs exist:
/a/b/c/d.txt /a/b/c
we say
/a/b/cis a “file”, and also a “dir”. User is recommended to avoid such namings.This situation does not happen in a local file system.
- abstract file_info() FileInfo | None[source]#
If
is_file()isFalse, returnNone; otherwise, return file info.
- property parent: Self#
The parent of the path.
If the path is the root, then the parent is still the root.
- abstract property root: Self#
Return a new path representing the root.
- joinpath(*other: str) Self[source]#
Join this path with more segments, return the new path object.
Calling this method is equivalent to combining the path with each of the
otherarguments in turn.If
selfwas created byUpath(*segs), then this method essentially returnsUpath(*segs, *other).If
*otheris a single string, there is a shortcut by the operator/, implemented by__truediv__().
- with_name(name: str) Self[source]#
Return a new path the the “name” part substituted by the new value. If the original path doesn’t have a name (i.e. the original path is the root),
ValueErroris raised.Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.with_name('sales.data') LocalUpath('/tmp/test/upathlib/data/sales.data')
- with_suffix(suffix: str) Self[source]#
Return a new path with the suffix replaced by the specified value. If the original path doesn’t have a suffix, the new suffix is appended instead. If
suffixis an empty string, the original suffix is removed.suffixshould include a dot, like'.txt'.Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> >>> # replace the last suffix: >>> p.with_suffix('.data') LocalUpath('/tmp/test/upathlib/data/sales.txt.data') >>> >>> # remove the last suffix: >>> p.with_suffix('') LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> >>> p.with_suffix('').with_suffix('.bin') LocalUpath('/tmp/test/upathlib/data/sales.bin') >>> >>> pp = p.with_suffix('').with_suffix('') >>> pp LocalUpath('/tmp/test/upathlib/data/sales') >>> >>> # no suffix to remove: >>> pp.with_suffix('') LocalUpath('/tmp/test/upathlib/data/sales') >>> >>> # add a suffix: >>> pp.with_suffix('.pickle') LocalUpath('/tmp/test/upathlib/data/sales.pickle')
- abstract write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None[source]#
Write bytes
datato the current file.Parent “directories” are created as needed, if applicable.
If
overwriteisFalseand the current file exists,FileExistsErroris raised.datais either “byte-like” (such as bytes, bytearray, memoryview) or “file-like” open in “binary” mode. In the second case, the file should be positioned at the beginning (such as by calling.seek(0).)
- abstract read_bytes() bytes[source]#
Return the binary contents of the pointed-to file as a bytes object.
If
selfis not a file or does not exist,FileNotFoundErroris raised.
- write_text(data: str, *, overwrite: bool = False, encoding: str | None = None, errors: str | None = None) None[source]#
Write text
datato the current file.Parent “directories” are created as needed, if applicable.
If
overwriteisFalseand the current file exists,FileExistsErroris raised.encodinganderrorsare passed to encode(). Usually you should leave them at the default values.
- read_text(*, encoding: str | None = None, errors: str | None = None) str[source]#
Return the decoded contents of the pointed-to file as a string.
If
selfis not a file or does not exist,FileNotFoundErroris raised.encodinganderrorsare passed to decode(). Usually you should leave them at the default values.
- copy_dir(source: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int[source]#
Copy the content of the source directory recursively to
self.If
sourceis an string, then it is in the same store as the current path, and it is either absolute, or relative toself.parent.If
sourceis not a string, then it must be an instance of aUpath()subclass, and it may be in any store system.Immediate children of
sourcewill be copied as immediate children ofself.There is no such error as “target directory exists” as the copy-operation only concerns individual files. If the target directory (i.e.
self) contains files that do not have counterparts in the source directory, they will stay untouched.overwriteis file-wise. IfFalse, any existing target file will raiseFileExistsErrorand halt the operation. IfTrue, any existing target file will be overwritten by the source file.quietcontrols whether to print out progress info.- Returns:
- int
The number of files copied.
- copy_file(source: str | Upath, *, overwrite: bool = False) None[source]#
Copy the
sourcefile to the current file (i.e.self).If
sourceis str, then it is in the same store as the current path, and it is either absolute, or relative toself.parent. For example, ifselfis'/a/b/c/d.txt', thensource='e.txt'means'/a/b/c/e.txt'.If
sourceis not a string, then it must be an instance of aUpath()subclass, and it may be in any storage system.If
sourceis not an existing file,FileNotFoundErroris raised.If
selfis an existing file andoverwriteisFalse,FileExistsErroris raised. IfoverwriteisTrue, then theselfwill be overwritten.If
type(source)isLocalUpathandselfis an existing directory, thenIsADirectoryErroris raised. In a cloud blob store, there is no concrete “directory”. For example, supposeselfis ‘gs://mybucket/backup/record’ on Google Cloud Storage, andsourceis ‘/experiment/data’, thensourceis ‘gs://mybucket/experiment/data’. If there exists blob ‘gs://mybucket/backup/record/y’, then we say ‘gs://mybucket/backup/record’ is a “directory”. However, this is merely a “virtual” concept, or an emulation of the “directory” concept on local disk. As long as theselfpath is not an existing blob, the copy will proceed with no problem. Nevertheless, such naming is confusing and better avoided.Return the number of files copied (either 0 or 1).
- remove_dir(*, quiet: bool = True, concurrent: bool = True) int[source]#
Remove the current directory (i.e.
self) and all its contents recursively.Essentially, this removes each file that is yielded by
riterdir(). Subclasses should take care to remove “empty directories”, if applicable, that are left behind.quietcontrols whether to print progress info.- Returns:
- int
The number of files removed.
- abstract remove_file() None[source]#
Remove the current file (i.e.
self).If
selfis not an existing file,FileNotFoundErroris raised. If the file exists but can’t be removed, the platform-dependent exception is propagated.
- abstract iterdir() Iterator[Self][source]#
Yield the immediate (i.e. non-recursive) children of the current dir (i.e.
self).The yielded elements are instances of the same class. Each yielded element is either a file or a dir. There is no guarantee on the order of the returned elements.
If
selfis not a dir (e.g. maybe it’s a file), or does not exist at all, nothing is yielded (resulting in an empty iterable); no exception is raised.See also
- ls() list[Self][source]#
Return the elements yielded by
iterdir()in a sorted list.Sorting is by a full path string maintained internally.
The returned list may be empty.
- abstract riterdir() Iterator[Self][source]#
Yield files under the current dir (i.e.
self) recursively. The method name means “recursive iterdir”.The yielded elements are instances of the same class. They represent existing files.
Compared to
iterdir(), this is recursive, and yields files only. Empty subdirectories will have no representation in the return.Similar to
iterdir(), ifselfis not a dir or does not exist, then nothing is yielded, and no exception is raised either.There is no guarantee on the order of the returned elements.
- rmrf(*, quiet: bool = True, concurrent: bool = False) int[source]#
Remove the current file or dir (i.e.
self) recursively.Analogous to the Linux command
rm -rf, hence the name of this method.Return the number of files removed.
For example, if these blobs are present:
/a/b/c/d/e.txt /a/b/c/kk.data /a/b/c
then
Upath('/a/b/c').rmrf()would remove all of them.concurrentisFalseby default because this method is often used in__del__of user classes, and thread pool is problematic in__del__.
- abstract lock(*, timeout: int = None) Self[source]#
Lock the current file (i.e.
self), in order to have exclusive access to the code block that has possession of the lock.timeout: if the lock can’t be acquired withintimeoutseconds,LockAcquireErroris raised. IfNone, wait for a default reasonably long time. To wait “forever”, just pass in a large number.timeout=0is a valid input, meaning making exactly one attempt to acquire a lock.Once a lock is acquired, it will not expire until this contextmanager exits. In other words, this is timeout for the “lock acquisition”, not for the lock itself. Actual waiting time could be slightly longer or shorter.
This is a “mandatory lock”, as opposed to an “advisory lock”. The intended use case is for this lock to be used for implementing a (cooperative) “code lock”.
As this abstract method is to be used as a context manager, a subclass should use
yieldin its implementation. Theyieldstatement should yield self.One way to achieve cooperative locking on a file via this lock is like this:
f = Upath('abc.txt') with f.lock(): ... # Now read from or write to `f` with exclusive access.
Depending on the capabilities of the specific storage system, the lock may be on the current file itself or on another, helper file as an implementation detail. In the latter case, this method should delete the helper file upon release of the lock, if possible.
Some storage engines may not provide the capability to implement this lock.
- class upathlib.FileInfo[source]#
Bases:
object- ctime: float#
Creation time as a POSIX timestamp.
- mtime: float#
Last modification time as a POSIX timestamp.
- time_created: datetime#
Creation time as an
datetimeobject.
- time_modified: datetime#
Last modification time as an
datetimeobject.
- size: int#
In bytes.
- details: Any#
Platform-dependent.
- __init__(ctime: float, mtime: float, time_created: datetime, time_modified: datetime, size: int, details: Any) None#
LocalUpath#
- class upathlib.LocalUpath[source]#
Bases:
Upath,PathLike- __init__(*pathsegments: str)[source]#
Create a path on the local file system. Both POSIX and Windows platforms are supported.
*pathsegmentsspecify the path, either absolute or relative to the current working directory. If missing, the constructed path is the current working directory. This is passed to pathlib.Path.
- __fspath__() str[source]#
LocalUpath implements the os.PathLike protocol, hence a LocalUpath object can be used anywhere an object implementing os.PathLike is accepted. For example, used with the builtin function open():
>>> p = LocalUpath('/tmp/test/data.txt') >>> p.rmrf() 0 >>> p.write_text('abc') >>> with open(p) as file: ... print(file.read()) abc >>> p.rmrf() 1
- property path: Path#
Return the pathlib.Path object of the path.
- as_uri() str[source]#
Represent the path as a file URI. On Linux, this is like ‘file:///home/username/path/to/file’. On Windows, this is like ‘file:///C:/Users/username/path/to/file’.
- file_info() FileInfo | None[source]#
Return file info if the current path is a file; otherwise return
None.
- property root: LocalUpath#
Return a new path representing the root.
On Windows, this is the root on the same drive, like
LocalUpath('C:'). On Linux and Mac, this isLocalUpath('/').
- write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False)[source]#
Write the bytes
datato the current file.
- copy_dir(source: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int[source]#
- rename_dir(target: str | LocalUpath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) LocalUpath[source]#
Rename the current dir (i.e.
self) totarget.overwriteis applied file-wise. If there are files undertargetthat do not have counterparts underself, they are left untouched.quietcontrols whether to print progress info.Return the new path.
- rename_file(target: str | LocalUpath, *, overwrite: bool = False) LocalUpath[source]#
Rename the current file (i.e.
self) totargetin the same store.targetis either absolute or relative toself.parent. For example, ifselfis ‘/a/b/c/d.txt’, thentarget='e.txt'means ‘/a/b/c/e.txt’.If
overwriteisFalse(the default) and the target file exists,FileExistsErroris raised.Return the new path.
- iterdir() Iterator[LocalUpath][source]#
Yield the immediate children under the current dir.
- riterdir() Iterator[LocalUpath][source]#
Yield all files under the current dir recursively.
BlobUpath#
- class upathlib._blob.BlobUpath[source]#
Bases:
UpathBlobUpath is a base class for paths in a cloud storage, aka “blob store”. This is in contrast to a local disk storage, which is implemented by
LocalUpath.- property blob_name: str#
Return the “name” of the blob. This is the “path” without a leading
'/'. In cloud blob stores, this is exactly the name of the blob. The name often contains'/', which has no special role in the name per se but is interpreted by users to be a directory separator.
- is_dir() bool[source]#
In a typical blob store, there is no such concept as a “directory”. Here we emulate the concept in a local file system. If there is a blob named like
/ab/cd/ef/g.txt
we say there exists directory “/ab/cd/ef”. We should never have a trailing / in a blob’s name, like
/ab/cd/ef/
(I don’t know whether the blob stores allow such blob names.)
Consequently,
is_diris equivalent to “having stuff in the dir”. There is no such thing as an “empty directory” in blob stores.
- iterdir() Iterator[Self][source]#
Yield immediate children under the current dir.
This is a naive, inefficient implementation. Expected to be refined by subclasses.
- download_dir(target: str | Path | LocalUpath, **kwargs) int[source]#
- download_file(target: str | Path | LocalUpath, **kwargs) None[source]#
- upload_dir(source: str | Path | LocalUpath, **kwargs) int[source]#
- upload_file(source: str | Path | LocalUpath, **kwargs) None[source]#
GcsBlobUpath#
- class upathlib.GcsBlobUpath[source]#
Bases:
BlobUpathGcsBlobUpath implements the
UpathAPI for Google Cloud Storage using the package google-cloud-storage.- __init__(*paths: str, bucket_name: str | None = None)[source]#
If
bucket_nameisNone, then*pathsshould be a single string starting with ‘gs://<bucket-name>/’.If
bucket_nameis specified, then*pathsspecify path components under the root of the bucket.Examples
These several calls are equivalent:
>>> GcsBlobUpath('experiments', 'data', 'first.data', bucket_name='backup') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('/experiments/data/first.data', bucket_name='backup') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('gs://backup/experiments/data/first.data') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('gs://backup', 'experiments', 'data/first.data') GcsBlobUpath('gs://backup/experiments/data/first.data')
- is_file() bool[source]#
The result of this call is not cached, in case the object is modified anytime by other clients.
- is_dir() bool[source]#
If there is a dummy blob with name
f"{self.name}/", this will returnTrue. This is the case after creating a “folder” on the GCP dashboard. In programmatic use, it’s recommended to avoid such situations so thatis_dir()returnsTrueif and only if there are blobs “under” the current path.
- file_info(*, timeout=None) FileInfo | None[source]#
Return file info if the current path is a file; otherwise return
None.
- property root: GcsBlobUpath#
Return a new path representing the root of the same bucket.
- write_bytes(data: bytes | BufferedReader, *, overwrite=False, **kwargs)[source]#
Write bytes
datato the current blob.In the usual case,
datais bytes. The case wheredatais a io.BufferedReader object, such as an open file, is not well tested.
- remove_dir(**kwargs) int[source]#
Remove the current dir and all the content under it recursively. Return the number of blobs removed.
- lock(*, timeout=None)[source]#
This implementation does not prevent the file from being deleted by other workers that does not use this method. It relies on the assumption that this blob is used cooperatively solely between workers in this locking logic.
timeoutis the wait time for acquiring or releasing the lease. IfNone, the default value 600 seconds is used. If0, exactly one attempt is made to acquire a lock.Note: timeout = None does not mean infinite wait. It means a default wait time. If user wants longer wait, just pass in a large number.
Serializers#
- class upathlib.serializer.JsonSerializer[source]#
Bases:
Serializer
- class upathlib.serializer.PickleSerializer[source]#
Bases:
Serializer
- class upathlib.serializer.ZPickleSerializer[source]#
Bases:
PickleSerializer
- class upathlib.serializer.ZstdCompressor[source]#
Bases:
_local- compress(x, *, level=3, threads=0)[source]#
- Parameters:
- threads
Number of threads to use to compress data concurrently. When set, compression operations are performed on multiple threads. The default value (0) disables multi-threaded compression. A value of
-1means to set the number of threads to the number of detected logical CPUs.
Using upathlib to implement a “multiplexer”#
Multiplexer is a utility for distributing data elements to multiple concurrent or distributed workers.
Its implementation relies on the “locking” capability of Upath.
Suppose we perform some brute-force search on a cluster of machines;
there are 1000 grids, and the algorithm takes on one grid at a time.
Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values.
We want to distribute these 1000 values to the workers.
This is the kind of use cases targeted by Multiplexer.
Let’s show its usage using local data and multiprocessing.
(For real work, we would use cloud storage and a cluster of machines.)
First, create a Multiplexer to hold the values to be distributed:
>>> from upathlib import LocalUpath
>>> from upathlib.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20
Next, design an interesting worker function:
>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
... for x in Multiplexer(mux_id):
... time.sleep(random.uniform(0.1, 0.2)) # doing a lot of things
... print(x, 'done in', multiprocessing.current_process().name)
Back in the main process,
>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
... t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
... t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>
- class upathlib.multiplexer.Multiplexer[source]#
Bases:
Iterable[Element],SizedMultiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.
Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.
The usage consists of two main parts:
1. In “coordinator” code, call
create_read_session()to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls
new()to create a new Multiplexer, then callscreate_read_session()on it, and then manages to send the ID to workers.2. In “worker” code, use the ID that was returned by
create_read_session()to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.As far as this utility is concerned, the coordinator does not need to wait for the workers to finish consuming the data. The coordinator process or machine may quit and let the workers continue with their job. Usually you should take care of job tracking separately.
See
create_read_session()for more details.- classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
- Parameters:
- data
The data elements that need to be distributed. The elements should be pickle-able.
Importantly, data (the dataset) is meant to contain a modest number (say, up to thousands) of “control parameters”, not massive number of raw data elements. This is because distributing each data element by this utility incurs nontrivial overhead. Each data element is meant to trigger a substantial amount of processing in a worker, making the overhead of obtaining the data element worthwhile.
Based on this understanding, the elements in data are simply saved in a single pickle file, and each worker will fetch a copy of this file.
- path
A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under
pathto store data and info about this particular multiplexer. The name of the subdirectory is a datetime string.tagis appended to the sub-directory name to be more informative, if so desired.If
pathis in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. Ifpathis on the local disk, then the workers are in threads or processes on the same machine.However, there are no strong reasons to use this facility on a local machine, because the same functionality can be achieved by a queue-based solution.
Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by
upathlib.Since path is a “root directory” hosting Multiplexers (each in a randomly named sub-directory), a subclass may choose to fix this directory so that
new()does away with this parameter.
- __init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#
Create a
Multiplexerobject and use it to distribute the data elements that have been stored bynew().- Parameters:
- mux_id
The value that is returned by
create_read_session().- worker_id
A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.
- property worker_id: str#
- create_read_session() str[source]#
Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new
Multiplexerand calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:mux = Multiplexer.new(range(1000), '/tmp/abc/mux/') mux_id = mux.create_read_session()
The
mux_idis then provided to the workers, which will createMultiplexerinstances pointing to the same dataset and participating in the reading session that has just been started:mux = Multiplexer(mux_id) for x in mux: ...
The data that was provided to
new()is split between the workers so that each data element will be obtained by exactly one worker.The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.
This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new
Multiplexerobject to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.As a rule of thumb, an object created by
Multiplexer.new(data, ...)is not a participant of any read session (even aftercreate_read_session()is called on it subsequently). On the other hand, an object created byMultiplexer(mux_id, ...)is participating in the read session that is identified bymux_id.
- stat(mux_id: str | None = None) dict[source]#
Return status info of an ongoing read session.
This is often called in the “coordinator” code on the object that has had its
create_read_session()called.mux_idis the return ofcreate_read_session(). Ifmux_idisNone, then this method is about the read session in which the current object is participating.
- done(mux_id: str | None = None) bool[source]#
Return whether the data iteration is finished.
This is often called in the “coordinator” code on the object that has had its
create_read_session()called.mux_idis the return ofcreate_read_session(). Ifmux_idisNone, then this method is about the read session in which the current object is participating.
Using upathlib to implement a manager of versioned datasets with backup in the cloud#
VersionedUploadable helps store and use a “dataset” in an exclusive directory in consistent and convenient ways. Specifically,
The dataset is identified by a version string that is generated and sortable (datetime-based), so that the “newest” is always the “latest” version, and code can infer the latest version. The full path of the storage location is managed for the user, who only needs the version.
The storage can be either local (on disk) or remote (in a cloud blob store). There are methods to download/upload between local and remote storages.
However, “local” and “remote” are just labels for the two storage locations. They can be both on local disk, or both in the same cloud storage (in different “locations”), or in two different cloud storages, or one on local disk and the other in a cloud blob store. This is controlled by the classmethods
VersionedUploadable.local_cls_upath()andVersionedUploadable.remote_cls_upath(), which should be implemented by subclasses.Within the dataset, one can conveniently specify sub-directories and files relative to the “root”, and read/write. This navigation is the same regardless of whether the storage is local or remote.
- class upathlib.versioned_uploadable.VersionedUploadable[source]#
Bases:
ABCA subclass will customize
remote_cls_upath()andlocal_cls_upath().- classmethod resolve_version(version: str, remote: bool | None = None) tuple[str, bool][source]#
Given
versionas one of the special values—‘latest-local’, ‘latest-remote’, and ‘latest’—or an actual version string, andremoteasNoneor explicitTrue/False, figure out the actual version and its remote-ness.This is called by
__init__().- Parameters:
- version
Either one of the special values ‘latest’, ‘latest-local’, and ‘latest-remote’, or an actual version string like ‘20210322-120529’.
If
versionis an actual version string, thenversionandremoteare returned as is, even ifremoteisNone. It is checked thatversionis a valid version string, but existence of the version is not checked.- remote
If
True, look in remote (cloud) storage only. IfFalse, look in local storage only. IfNone, look in both remote and local.If
versionis ‘latest-local’, thenremotemust beFalseorNone.If
versionis ‘latest-remote’, thenremotemust beTrueorNone.If
versionis ‘latest’, then find the latest between local and remote storages ifremoteisNone, otherwiseversionbecomes ‘latest-remote’ or ‘latest-local’ according to the value ofremote.
- Returns:
- tuple
A tuple of two elements: the actual version string, and remote-ness.
Raises
ValueErrorif the parameters are incompatible.Raises
VersionNotFoundErrorif no version is found that satisfies the request.
- abstract classmethod local_cls_upath() LocalUpath[source]#
A subclass implements this method to determine the full path on the local disk for the entity represented by the particular subclass, i.e. a particular type of “dataset”.
The file-system structure directly under this path is determined by this class. Currently it contains a subdirectory called ‘versions’, in which goes on subdirectory per version, named after the version string.
In the directory of one particular version, the content is determined by the user. User can create whatever subdirectories and files they want. This class uses one meta file in the root of the version’s directory and the file is named “info.json”.
See also
- abstract classmethod remote_cls_upath() BlobUpath[source]#
Analogous to
local_cls_upath()but on the remote side.See also
- classmethod local_version_upath(version: str) LocalUpath[source]#
Root directory of the specified version in the local storage.
- classmethod remote_version_upath(version: str) BlobUpath[source]#
Root directory of the specified version in the remote storage.
- classmethod local_versions() list[str][source]#
Get a (potentially empty) list of the versions that exist on the local disk.
The elements in the list are sorted from small (old) to large (new).
Because
remote_versionsandlocal_versionsget “directories” v/o checking their content, they might get invalid (corrupt or empty) versions. User should delete such bad versions as they are discovered.
- classmethod remote_versions() list[str][source]#
Analogous to
local_versions()but on the remote side.
- classmethod has_local_version(version: str) bool[source]#
A version is considered existent if and only if the file “info.json” exists in its root directory.
- classmethod has_remote_version(version: str) bool[source]#
Analogous to
has_local_version()but on the remote side.
- classmethod remove_local_version(version: str, **kwargs) None[source]#
Delete the entire directory of the specified version on the local disk.
By default, there is neither warning before the deletion nor progress printouts.
- Parameters:
- version
The exact version string. If the version does not exist, it’s a no-op.
- **kwargs
Passed on to
remove_dir().
- classmethod remove_remote_version(version: str, **kwargs) None[source]#
Analogous to
remove_local_version()but on the remote side.
- classmethod new(*, tag: str | None = None, remote: bool = False, **kwargs) VersionedUploadable[source]#
If a subclass needs additional setup on a newly created object, they may choose to override this classmethod
new.The optional
tagappends (human readable) info to the auto created version string, which is based on current date and time.The returned object has attribute
info, which is an empty dict. Nothing has been written to storage.
- __init__(version: str, *, remote: bool | None = None, require_exists: bool = True)[source]#
This loads up an existing version for reading and writing. The create a new version, use the classmethod
new().- Parameters:
- version
Either the actual version string, or one of ‘latest’, ‘latest-local’, and ‘latest-remote’.
- remote
Look for the version in local or remote storage?
If an explicit bool, it must be compatible with
version. For example,version='latest-remote'andremote=Falseare not compatible.If
None, andversion='latest', then the latest version between local and remote is found and used. If local and remote have the same latest version, then the local one is used.If
None, andversionis an exact version, then find it either locally or remotely wherever it exists. If the version exists in both storages, then the local one is used.- require_exists
Default is
True. Ifversionis an exact version string but the version does not exist,VersionNotFoundErroris raised. Usually you should leave this at the default. This is mainly for the call of__init__innew(), where it needs to userequire_exists=False.
- version: str#
version of the object
- remote: bool#
remote-ness of the object
- property upath: Upath#
Return the root directory of
self.This is consistent with the remote-ness of
self.
- path(*args: str) Upath[source]#
Return a path relative to the root directory of
self.Examples:
self.path() # the root path self.path(‘info.json’) # file in root directory self.path(‘abc’, ‘de’, ‘data.parquet’) # file ‘abc/de/data.parquet’ under root directory self.path(‘abc/de/data.parquet’) # same as above
Typically, you proceed to read or write with the returned path (object), e.g.,
self.path('info.json').write_json(self.info, overwrite=True) info = self.path('info.json').read_json()
This is consistent with the remote-ness of
self. In other words, ifselfis local, then the returned path is local (under the local root directory); otherwise, the returned path is remote.self.path('abc.txt')is equivalent to(self.upath / 'abc.txt').Note
Don’t start
argswith'/'.
- save() None[source]#
A subclass should re-implement this method to save its own stuff like data, summary, and whatever, and in the end call
super().save().
- download(path: str | None = None, *, overwrite: bool = False, **kwargs) int[source]#
Download the entire dataset or specified parts of it.
If the current object already points to a local version, then
UnsupportedOperationis raised.If you know certain files have changed, you can bring remote/local into sync by downloading/uploading those particular files.
- Parameters:
- path
Specific subdirectory or file to download. If
None, the entire version is downloaded.If
None, and the local version already exists, andoverwriteisFalse, then download will not happen. However, if the local version is incomplete or corrupt compared to the remote counterpart (the same version), the code wouldn’t know.If not
None, then the specified sub-directory or file will be downloaded (into the expected locations for the version). This is meant for “repair work” if you know certain parts of the local version are corrupt or missing. If the version does not exist locally, there is hardly a scenario for downloading only parts of it (and that may cause issues later, as you are creating an incomplete local version).- overwrite
If
True, overwrite any file that exists locally.- **kwargs
If
pathisNone, this is passed on toupathlib.Upath.download_dir. Ifpathis notNone, this is ignored.
- Returns:
- int
The number of files downloaded.
Warning
You should not use
overwrite=Truelightly just to ensure it proceeds. The defaultoverwrite=Falseprevents re-downloading when the local version already exists. Try to benefit from such savings as far as you can.
- upload(path: str | None = None, *, overwrite: bool = False, **kwargs) int[source]#
Analogous to
download.Return the number of files uploaded.
- ensure_local(*, init_kwargs: dict[str, Any] | None = None, **kwargs) VersionedUploadable[source]#
Return a local object of this version that exists.
If
selfis local, thenselfis returned.Otherwise, if the local version does not exist, it will be downloaded. If the local version exists, downloading will not happen. (This code has to assume the local version is sound. though.) To force downloading regardless, pass in
overwrite=True, but don’t do that lightly!- Parameters:
- init_kwargs
For special needs of a subclass that defines additional arguments for its
__init__.- **kwargs
Passed on to
download.- .. note:: Calling ``ensure_local`` does not make the current object local;
you need to receive and use the returned object, which is local.
- ensure_remote(*, init_kwargs: dict[str, Any] | None = None, **kwargs) VersionedUploadable[source]#