upathlib#
(Generated on Feb 08, 2024 for upathlib version 0.9.4.)
The package upathlib defines a unified API for cloud blob store (aka “object store”) as well as local file systems.
Attention is focused on identifying the most essential functionalities while working with a blob store for data processing. Functionalities in a traditional local file system that are secondary in these tasks—such as symbolic links, fine-grained permissions, and various access modes—are ignored.
End user should look to the class Upath
for documentation of the API.
Local file system is implemented by LocalUpath
, which subclasses Upath.
Client for Google Cloud Storage (i.e. blob store on GCP) is implemented by another Upath subclass,
namely GcsBlobUpath
.
One use case is the package biglist, where the class Biglist takes a Upath object to indicate its location of storage. It does not care whether the storage is local or in a cloud blob store—it simply uses the common API to operate the storage.
To install, do one of the following:
$ pip3 install upathlib
$ pip3 install upathlib[gcs]
Quickstart#
Let’s carve out a space in the local file system and poke around.
>>> from upathlib import LocalUpath
>>> p = LocalUpath('/tmp/abc')
This creates a LocalUpath
object p
that points to the location
'/tmp/abc'
. This may be an existing file, or directory, or may be nonexistent.
We know this is a temporary location; to be sure we have a clear playground, let’s
wipe out anything and everything:
>>> p.rmrf()
0
Think rm -rf /tmp/abc
. It does just that. The returned 0 means zero files were deleted.
Now let’s create a file and write something to it:
>>> (p / 'x.txt').write_text('first')
This creates file /tmp/abc/x.txt
with the content 'first'
. Note the directory '/tmp/abc'
did not exist before the call. We did not need to “create the parent directory”.
In fact, upathlib
does not provide a way to do that.
In upathlib
, “directory” is a “virtual” thing that is embodied by a group of files.
For example, if there exist
/tmp/abc/x.txt
/tmp/abc/d/y.data
we say there is directories '/tmp/abc'
and '/tmp/abc/d'
, but we
don’t create these “directories” by themselves. These directories come into being
if there exist such files.
Let’s actually create these files:
>>> (p / 'x.txt').write_text('second', overwrite=True)
>>> (p / 'd' / 'y.data').write_bytes(b'0101')
Now let’s look into this directory:
>>> p.is_dir()
True
>>> (p / 'd').is_dir()
True
>>> (p / 'x.txt').is_dir()
False
>>> (p / 'x.txt').is_file()
True
We can navigate in the directory. For example,
>>> for v in sorted(p.iterdir()): # the sort merely makes the result stable
... print(v)
/tmp/abc/d
/tmp/abc/x.txt
This is only the first level, or “direct children”. We can also use “recursive iterdir” to get all files under the directory, descending into subdirectories recursively:
>>> for v in sorted(p.riterdir()): # the sort merely makes the result stable
... print(v)
/tmp/abc/d/y.data
/tmp/abc/x.txt
This time only files are listed. Subdirectories do not show up because,
after all, they are not real in upathlib
concept.
We can as easily read a file, like
>>> (p / 'x.txt').read_text()
'second'
Several common file formats are provided out of the box, including text, bytes, json, and pickle, as well as compressed versions by zlib and Zstandard.
Let’s do some JSON:
>>> pp = p / 'e/f/g/data.json'
>>> pp.write_json({'name': 'John', 'age': 38})
We know the JSON file is also a text file, so we can treat it as such:
>>> pp.read_text()
'{"name": "John", "age": 38}'
But usually we prefer to get back the Python object directly:
>>> v = pp.read_json()
>>> v
{'name': 'John', 'age': 38}
>>> type(v)
<class 'dict'>
We can go “down” the directory tree using /
.
Conversely, we can go “up” using parent()
:
>>> pp.path
PosixPath('/tmp/abc/e/f/g/data.json')
>>> pp.parent
LocalUpath('/tmp/abc/e/f/g')
>>> pp.parent.parent
LocalUpath('/tmp/abc/e/f')
>>> pp.parent.parent.is_dir()
True
>>> pp.parent.parent.is_file()
False
or the terminal-lovers’ ..
:
>>> pp
LocalUpath('/tmp/abc/e/f/g/data.json')
>>> pp / '..'
LocalUpath('/tmp/abc/e/f/g')
>>> pp / '..' / '..'
LocalUpath('/tmp/abc/e/f')
Under the hood, /
delegates to a call to joinpath()
:
>>> pp.joinpath('../../o/p/q')
LocalUpath('/tmp/abc/e/f/o/p/q')
Let’s see again what we have:
>>> sorted(p.riterdir())
[LocalUpath('/tmp/abc/d/y.data'), LocalUpath('/tmp/abc/e/f/g/data.json'), LocalUpath('/tmp/abc/x.txt')]
and to get rid of them all:
>>> p.rmrf()
3
A nice thing about upathlib
is the “unified” nature across local and cloud storages.
Suppose we have set up the environment to use Google Cloud Storage, then we could have started this excercise with
>>> from upathlib import GcsBlobUpath
>>> p = GcsBlobUpath('gs://my-bucket/tmp/abc')
Everything after this would work unchanged. (The printouts would be different at some places,
e.g. LocalUpath
would be replaced by GcsBlobUpath
.)
Upath#
Upath
is an abstract base class that defines the APIs and some of the implementation.
Subclasses tailor to particular storage systems.
Currently there are two production-ready subclasses; they implement Upath
for local file systems and Google Cloud Storage, respectively.
The APIs follow the style of the standard library pathlib where appropriate.
- class upathlib.Upath[source]#
Bases:
ABC
- __init__(*pathsegments: str)[source]#
Create a
Upath
instance. BecauseUpath
is an abstract class, this is always called on a subclass to instantiate a path on the specific storage system.Subclasses for cloud blob stores may need to add additional parameters representing, e.g., container/bucket name, etc.
- Parameters:
- *pathsegments
Analogous to the input to pathlib.Path. The first segment may or may not start with
'/'
. The path constructed with*pathsegments
is always “absolute” under a known “root”.For a local POSIX file system, the root is the usual
'/'
.For a local Windows file system, the root is resolved to a particular “drive”.
For Azure blob store, the root is that in a “container”.
For AWS and GCP blob stores, the root is that in a “bucket”.
If missing, the path constructed is the “root”. However, the subclass
LocalUpath
plugs in the current working directory for a missing*pathsegments
.Note
If one segment starts with
'/'
, it will reset to the “root” and discard all the segments that have come before it. For example,Upath('work', 'projects', '/', 'projects')
is the same asUpath('/', 'projects)
.Note
The first element of
*pathsegments
may start with some platform-specific strings. For example,'/'
on Linux,'c://'
on Windows,'gs://'
on Google Cloud Storage. Please see subclasses for specifics.
- __truediv__(key: str) Self [source]#
This method is invoked by
self / key
. This calls the methodjoinpath()
.
- property path: PurePath#
The pathlib.PurePath version of the internal path string.
In the subclass
LocalUpath
, this property is overriden to return a pathlib.Path, which is a subclass of pathlib.PurePath.In subclasses for cloud blob stores, this implementation stays in effect.
- abstract as_uri() str [source]#
Represent the path as a file URI. See subclasses for platform-dependent specifics.
- property name: str#
A string representing the final path component, excluding the drive and root, if any.
This is the
name
component ofself.path
. Ifself.path
is'/'
(the root), then an empty string is returned. (The name of the root path is empty.)Examples
>>> from upathlib import LocalUpath >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.path PosixPath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.name 'sales.txt.gz' >>> p.parent.parent.parent.parent LocalUpath('/tmp') >>> p.parent.parent.parent.parent.name 'tmp' >>> p.parent.parent.parent.parent.parent LocalUpath('/') >>> p.parent.parent.parent.parent.parent.name '' >>> # the parent of root is still root: >>> p.parent.parent.parent.parent.parent.parent LocalUpath('/')
- property stem: str#
The final path component, without its suffix.
This is the “stem” part of
self.name
.Examples
>>> from upathlib import LocalUpath >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p.path PosixPath('/tmp/test/upathlib/data/sales.txt') >>> p.name 'sales.txt' >>> p.stem 'sales' >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.stem 'sales.txt'
- property suffix: str#
The file extension of the final component, if any
- property suffixes: list[str]#
A list of the path’s file extensions.
Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> p.suffix '.txt' >>> p.suffixes ['.txt'] >>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.suffix '.gz' >>> p.suffixes ['.txt', '.gz']
- exists() bool [source]#
Return
True
if the path is an existing file or dir;False
otherwise.Examples
In a blobstore with blobs
/a/b/cd /a/b/cd/e.txt
'/a/b/cd'
exists, and is both a file and a dir;'/a/b/cd/e.txt'
exists, and is a file;'/a/b'
exists, and is a dir;'/a/b/c'
does not exist.
- abstract is_dir() bool [source]#
Return
True
if the path is an existing directory;False
otherwise.If there exists a file named like
/a/b/c/d.txt
we say
'/a'
,'/a/b'
,'/a/b/c'
are existing directories.In a cloud blob store, there’s no such thing as an “empty directory”, because there is no concept of “directory”. A blob store just consists of files (aka blobs) with names, which could contain the letter ‘/’, with no special meaning attached to it. We interpret the name
'/a/b'
as a directory to emulate the familiar concept in a local file system when there exist files named'/a/b/*'
.In a local file system, there can be empty directories. However, it is recommended to not have empty directories.
There is no method for “creating an tempty dir” (like the Linux command
mkdir
). Simply create a file under the dir, and the dir will come into being. This is analogous to we create files all the time—we don’t “create” an empty file in advance; we simply write to the would-be path of the file to be created.
- abstract is_file() bool [source]#
Return
True
if the path is an existing file;False
otherwise.In a cloud blob store, a path can be both a file and a dir. For example, if these blobs exist:
/a/b/c/d.txt /a/b/c
we say
/a/b/c
is a “file”, and also a “dir”. User is recommended to avoid such namings.This situation does not happen in a local file system.
- abstract file_info() FileInfo | None [source]#
If
is_file()
isFalse
, returnNone
; otherwise, return file info.
- property parent: Self#
The parent of the path.
If the path is the root, then the parent is still the root.
- abstract property root: Self#
Return a new path representing the root.
- joinpath(*other: str) Self [source]#
Join this path with more segments, return the new path object.
Calling this method is equivalent to combining the path with each of the
other
arguments in turn.If
self
was created byUpath(*segs)
, then this method essentially returnsUpath(*segs, *other)
.If
*other
is a single string, there is a shortcut by the operator/
, implemented by__truediv__()
.
- with_name(name: str) Self [source]#
Return a new path the the “name” part substituted by the new value. If the original path doesn’t have a name (i.e. the original path is the root),
ValueError
is raised.Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> p.with_name('sales.data') LocalUpath('/tmp/test/upathlib/data/sales.data')
- with_suffix(suffix: str) Self [source]#
Return a new path with the suffix replaced by the specified value. If the original path doesn’t have a suffix, the new suffix is appended instead. If
suffix
is an empty string, the original suffix is removed.suffix
should include a dot, like'.txt'
.Examples
>>> p = LocalUpath('/tmp/test/upathlib/data/sales.txt.gz') >>> >>> # replace the last suffix: >>> p.with_suffix('.data') LocalUpath('/tmp/test/upathlib/data/sales.txt.data') >>> >>> # remove the last suffix: >>> p.with_suffix('') LocalUpath('/tmp/test/upathlib/data/sales.txt') >>> >>> p.with_suffix('').with_suffix('.bin') LocalUpath('/tmp/test/upathlib/data/sales.bin') >>> >>> pp = p.with_suffix('').with_suffix('') >>> pp LocalUpath('/tmp/test/upathlib/data/sales') >>> >>> # no suffix to remove: >>> pp.with_suffix('') LocalUpath('/tmp/test/upathlib/data/sales') >>> >>> # add a suffix: >>> pp.with_suffix('.pickle') LocalUpath('/tmp/test/upathlib/data/sales.pickle')
- abstract write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None [source]#
Write bytes
data
to the current file.Parent “directories” are created as needed, if applicable.
If
overwrite
isFalse
and the current file exists,FileExistsError
is raised.data
is either “byte-like” (such as bytes, bytearray, memoryview) or “file-like” open in “binary” mode. In the second case, the file should be positioned at the beginning (such as by calling.seek(0)
.)
- abstract read_bytes() bytes [source]#
Return the binary contents of the pointed-to file as a bytes object.
If
self
is not a file or does not exist,FileNotFoundError
is raised.
- write_text(data: str, *, overwrite: bool = False, encoding: str | None = None, errors: str | None = None) None [source]#
Write text
data
to the current file.Parent “directories” are created as needed, if applicable.
If
overwrite
isFalse
and the current file exists,FileExistsError
is raised.encoding
anderrors
are passed to encode(). Usually you should leave them at the default values.
- read_text(*, encoding: str | None = None, errors: str | None = None) str [source]#
Return the decoded contents of the pointed-to file as a string.
If
self
is not a file or does not exist,FileNotFoundError
is raised.encoding
anderrors
are passed to decode(). Usually you should leave them at the default values.
- copy_dir(target: str | Upath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) int [source]#
Copy the content of the current directory (i.e.
self
) recursively totarget
.If
target
is an string, then it is in the same store the the current path, and it is either absolute, or relative toself.parent
. In this case, the directory created by this operation will be the pathself.parent / target
.If
target
is not a string, then it must be an instance of aUpath()
subclass, and it may be in any store system.Immediate children of
self
will be copied as immediate children of the target path.There is no such error as “target directory exists” as the copy-operation only concerns invidivual files. If the target “directory” contains files that do not have counterparts in the source directory, they will stay untouched.
overwrite
is file-wise. IfFalse
, any existing target file will raiseFileExistsError
and halt the operation. IfTrue
, any existing target file will be overwritten by the source file.quiet
controls whether to print out progress info.- Returns:
- int
The number of files copied.
- copy_file(target: str | Upath, *, overwrite: bool = False) None [source]#
Copy the current file (i.e.
self
) totarget
.If
target
is str, then it is in the same store as the current path, and it is either absolute, or relative toself.parent
. In this case, the file created by this operation will the pathself.parent / target
. For example, ifself
is'/a/b/c/d.txt'
, thentarget='e.txt'
means'/a/b/c/e.txt'
.If
target
is not a string, then it must be an instance of aUpath()
subclass, and it may be in any storage system.target
is the target file, not a target directory to “copy into”.If
self
is not an existing file,FileNotFoundError
is raised.If
target
is an existing file andoverwrite
isFalse
,FileExistsError
is raised. Ifoverwrite
isTrue
, then the file will be overwritten.If
type(self)
isLocalUpath
andtarget
is an existing directory, thenIsADirectoryError
is raised. In a cloud blob store, there is no concrete “directory”. For example, supposeself
is the path ‘gs://mybucket/experiment/data’ on Google Cloud Storage, andtarget
is ‘/backup/record’, then the target path is ‘gs://mybucket/backup/record’. If there exists blob ‘gs://mybucket/backup/record/y’, then we say ‘gs://mybucket/backup/record’ is a “directory”. However, this is merely a “virtual” concept, or an emulation of the “directory” concept on local disk. As long as this path is not an existing blob, the copy will proceed with no problem. Nevertheless, such naming is confusing and better avoided.
- remove_dir(*, quiet: bool = True, concurrent: bool = True) int [source]#
Remove the current directory (i.e.
self
) and all its contents recursively.Essentially, this removes each file that is yielded by
riterdir()
. Subclasses should take care to remove “empty directories”, if applicable, that are left behind.quiet
controls whether to print progress info.- Returns:
- int
The number of files removed.
- abstract remove_file() None [source]#
Remove the current file (i.e.
self
).If
self
is not an existing file,FileNotFoundError
is raised. If the file exists but can’t be removed, the platform-dependent exception is propagated.
- abstract iterdir() Iterator[Self] [source]#
Yield the immediate (i.e. non-recursive) children of the current dir (i.e.
self
).The yieled elements are instances of the same class. Each yielded element is either a file or a dir. There is no guarantee on the order of the returned elements.
If
self
is not a dir (e.g. maybe it’s a file), or does not exist at all, nothing is yielded (resulting in an empty iterable); no exception is raised.See also
- ls() list[Self] [source]#
Return the elements yielded by
iterdir()
in a sorted list.Sorting is by a full path string maintained internally.
The returned list may be empty.
- abstract riterdir() Iterator[Self] [source]#
Yield files under the current dir (i.e.
self
) recursively. The method name means “recursive iterdir”.The yieled elements are instances of the same class. They represent existing files.
Compared to
iterdir()
, this is recursive, and yields files only. Empty subdirectories will have no representation in the return.Similar to
iterdir()
, ifself
is not a dir or does not exist, then nothing is yielded, and no exception is raised either.There is no guarantee on the order of the returned elements.
- rmrf(*, quiet: bool = True, concurrent: bool = False) int [source]#
Remove the current file or dir (i.e.
self
) recursively.Analogous to the Linux command
rm -rf
, hence the name of this method.Return the number of files removed.
For example, if these blobs are present:
/a/b/c/d/e.txt /a/b/c/kk.data /a/b/c
then
Upath('/a/b/c').rmrf()
would remove all of them.concurrent
isFalse
by default because this method is often used in__del__
of user classes, and thread pool is problematic in__del__
.
- abstract lock(*, timeout: int = None) Self [source]#
Lock the current file (i.e.
self
), in order to have exclusive access to the code block that has possesion of the lock.timeout
: if the lock can’t be acquired withintimeout
seconds,LockAcquireError
is raised. IfNone
, wait for a default reasonably long time. To wait “forever”, just pass in a large number.timeout=0
is a valid input, meaning making exactly one attempt to acquire a lock.Once a lock is acquired, it will not expire until this contexmanager exits. In other words, this is timeout for the “lock acquisition”, not for the lock itself. Actual waiting time could be slightly longer or shorter.
This is a “mandatory lock”, as opposed to an “advisory lock”. The intended use case is for this lock to be used for implementing a (cooperative) “code lock”.
As this abstract method is to be used as a context manager, a subclass should use
yield
in its implementation. Theyield
statement should yield self.One way to achive cooperative locking on a file via this lock is like this:
f = Upath('abc.txt') with f.lock(): ... # Now read from or write to `f` with exclusive access.
Depending on the capabilities of the specific storage system, the lock may be on the current file itself or on another, helper file as an implementation detail. In the latter case, this method should delete the helper file upon release of the lock, if possible.
Some storage engines may not provide the capability to implement this lock.
- class upathlib.FileInfo[source]#
Bases:
object
- ctime: float#
Creation time as a POSIX timetamp.
- mtime: float#
Last modification time as a POSIX timestamp.
- time_created: datetime#
Creation time as an
datetime
object.
- time_modified: datetime#
Last modification time as an
datetime
object.
- size: int#
In bytes.
- details: Any#
Platform-dependent.
- __init__(ctime: float, mtime: float, time_created: datetime, time_modified: datetime, size: int, details: Any) None #
LocalUpath#
- class upathlib.LocalUpath[source]#
Bases:
Upath
,PathLike
- __init__(*pathsegments: str)[source]#
Create a path on the local file system. Both POSIX and Windows platforms are supported.
*pathsegments
specify the path, either absolute or relative to the current working directory. If missing, the constructed path is the current working directory. This is passed to pathlib.Path.
- __fspath__() str [source]#
LocalUpath implements the os.PathLike protocol, hence a LocalUpath object can be used anywhere an object implementing os.PathLike is accepted. For example, used with the builtin function open():
>>> p = LocalUpath('/tmp/test/data.txt') >>> p.rmrf() 0 >>> p.write_text('abc') >>> with open(p) as file: ... print(file.read()) abc >>> p.rmrf() 1
- property path: Path#
Return the pathlib.Path object of the path.
- as_uri() str [source]#
Represent the path as a file URI. On Linux, this is like ‘file:///home/username/path/to/file’. On Windows, this is like ‘file:///C:/Users/username/path/to/file’.
- file_info() FileInfo | None [source]#
Return file info if the current path is a file; otherwise return
None
.
- property root: LocalUpath#
Return a new path representing the root.
On Windows, this is the root on the same drive, like
LocalUpath('C:')
. On Linux and Mac, this isLocalUpath('/')
.
- write_bytes(data: bytes | BufferedReader, *, overwrite: bool = False) None [source]#
Write the bytes
data
to the current file.
- rename_dir(target: str | LocalUpath, *, overwrite: bool = False, quiet: bool = False, concurrent: bool = True) LocalUpath [source]#
Rename the current dir (i.e.
self
) totarget
.overwrite
is applied file-wise. If there are files undertarget
that do not have counterparts underself
, they are left untouched.quiet
controls whether to print progress info.Return the new path.
- rename_file(target: str | LocalUpath, *, overwrite: bool = False) LocalUpath [source]#
Rename the current file (i.e.
self
) totarget
in the same store.target
is either absolute or relative toself.parent
. For example, ifself
is ‘/a/b/c/d.txt’, thentarget='e.txt'
means ‘/a/b/c/e.txt’.If
overwrite
isFalse
(the default) and the target file exists,FileExistsError
is raised.Return the new path.
- iterdir() Iterator[LocalUpath] [source]#
Yield the immediate children under the current dir.
- riterdir() Iterator[LocalUpath] [source]#
Yield all files under the current dir recursively.
BlobUpath#
- class upathlib._blob.BlobUpath[source]#
Bases:
Upath
BlobUpath is a base class for paths in a cloud storage, aka “blob store”. This is in contrast to a local disk storage, which is implemnted by
LocalUpath
.- property blob_name: str#
Return the “name” of the blob. This is the “path” without a leading
'/'
. In cloud blob stores, this is exactly the name of the blob. The name often contains'/'
, which has no special role in the name per se but is interpreted by users to be a directory separator.
- is_dir() bool [source]#
In a typical blob store, there is no such concept as a “directory”. Here we emulate the concept in a local file system. If there is a blob named like
/ab/cd/ef/g.txt
we say there exists directory “/ab/cd/ef”. We should never have a trailing / in a blob’s name, like
/ab/cd/ef/
(I don’t know whether the blob stores allow such blob names.)
Consequently,
is_dir
is equivalent to “having stuff in the dir”. There is no such thing as an “empty directory” in blob stores.
- iterdir() Iterator[Self] [source]#
Yield immediate children under the current dir.
This is a naive, inefficient implementation. Expected to be refined by subclasses.
- download_dir(target: str | Path | LocalUpath, *, overwrite: bool = False, quiet: bool = False, **kwargs) int [source]#
A specialization of
copy_dir()
where the target location is on the local disk.
- download_file(target: str | Path | LocalUpath, *, overwrite=False) None [source]#
A specialization of
copy_file()
where the target location is on the local disk.Subclass is expected to override with a more efficient implementation.
- upload_dir(source: str | Path | LocalUpath, *, overwrite: bool = False, quiet: bool = False, **kwargs) int [source]#
A specialization of
copy_dir()
forLocalUpath
where the target location is in a cloud blob store.
- upload_file(source: str | Path | LocalUpath, *, overwrite=False) None [source]#
A specialization of
copy_file()
forLocalUpath
where the target location is in a cloud blob store.Subclass is expected to override with a more efficient implementation.
GcsBlobUpath#
- class upathlib.GcsBlobUpath[source]#
Bases:
BlobUpath
GcsBlobUpath implements the
Upath
API for Google Cloud Storage using the package google-cloud-storage.- __init__(*paths: str, bucket_name: str | None = None)[source]#
If
bucket_name
isNone
, then*paths
should be a single string starting with ‘gs://<bucket-name>/’.If
bucket_name
is specified, then*paths
specify path components under the root of the bucket.Examples
These several calls are equivalent:
>>> GcsBlobUpath('experiments', 'data', 'first.data', bucket_name='backup') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('/experiments/data/first.data', bucket_name='backup') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('gs://backup/experiments/data/first.data') GcsBlobUpath('gs://backup/experiments/data/first.data') >>> GcsBlobUpath('gs://backup', 'experiments', 'data/first.data') GcsBlobUpath('gs://backup/experiments/data/first.data')
- is_file() bool [source]#
The result of this call is not cached, in case the object is modified anytime by other clients.
- is_dir() bool [source]#
If there is a dummy blob with name
f"{self.name}/"
, this will returnTrue
. This is the case after creating a “folder” on the GCP dashboard. In programatic use, it’s recommended to avoid such situations so thatis_dir()
returnsTrue
if and only if there are blobs “under” the current path.
- file_info(*, timeout=None) FileInfo | None [source]#
Return file info if the current path is a file; otherwise return
None
.
- property root: GcsBlobUpath#
Return a new path representing the root of the same bucket.
- write_bytes(data: bytes | BufferedReader, *, overwrite=False, **kwargs)[source]#
Write bytes
data
to the current blob.In the usual case,
data
is bytes. The case wheredata
is a io.BufferedReader object, such as an open file, is not well tested.
- download_file(target: str | Path | LocalUpath, *, overwrite=False, concurrent=True) None [source]#
Download the content of the current blob to
target
.
- upload_file(source: str | Path | LocalUpath, *, overwrite=False) None [source]#
Upload the content of
source
to the current blob.
- remove_dir(**kwargs) int [source]#
Remove the current dir and all the content under it recursively. Return the number of blobs removed.
- lock(*, timeout=None)[source]#
This implementation does not prevent the file from being deleted by other workers that does not use this method. It relies on the assumption that this blob is used cooperatively solely between workers in this locking logic.
timeout
is the wait time for acquiring or releasing the lease. IfNone
, the default value 600 seconds is used. If0
, exactly one attempt is made to acquire a lock.Note: timeout = None does not mean infinite wait. It means a default wait time. If user wants longer wait, just pass in a large number.
Serializers#
- class upathlib.serializer.JsonSerializer[source]#
Bases:
Serializer
- class upathlib.serializer.PickleSerializer[source]#
Bases:
Serializer
- class upathlib.serializer.ZPickleSerializer[source]#
Bases:
PickleSerializer
- class upathlib.serializer.ZstdCompressor[source]#
Bases:
_local
- compress(x, *, level=3, threads=0)[source]#
- Parameters:
- threads
Number of threads to use to compress data concurrently. When set, compression operations are performed on multiple threads. The default value (0) disables multi-threaded compression. A value of
-1
means to set the number of threads to the number of detected logical CPUs.
Using upathlib to implement a “multiplexer”#
Multiplexer
is a utility for distributing data elements to multiple concurrent or distributed workers.
Its implementation relies on the “locking” capability of Upath
.
Suppose we perform some brute-force search on a cluster of machines;
there are 1000 grids, and the algorithm takes on one grid at a time.
Now, the grid is a “hyper-parameter” or “control parameter” that takes 1000 possible values.
We want to distribute these 1000 values to the workers.
This is the kind of use cases targeted by Multiplexer
.
Let’s show its usage using local data and multiprocessing.
(For real work, we would use cloud storage and a cluster of machines.)
First, create a Multiplexer
to hold the values to be distributed:
>>> from upathlib import LocalUpath
>>> from upathlib.multiplexer import Multiplexer
>>> p = LocalUpath('/tmp/abc/mux')
>>> p.rmrf()
0
>>> hyper = Multiplexer.new(range(20), p)
>>> len(hyper)
20
Next, design an interesting worker function:
>>> import multiprocessing, random, time
>>>
>>> def worker(mux_id):
... for x in Multiplexer(mux_id):
... time.sleep(random.uniform(0.1, 0.2)) # doing a lot of things
... print(x, 'done in', multiprocessing.current_process().name)
Back in the main process,
>>> mux_id = hyper.create_read_session()
>>> tasks = [multiprocessing.Process(target=worker, args=(mux_id,)) for _ in range(5)]
>>> for t in tasks:
... t.start()
>>>
2 done in Process-13
0 done in Process-11
1 done in Process-12
4 done in Process-15
3 done in Process-14
6 done in Process-11
7 done in Process-12
8 done in Process-15
5 done in Process-13
9 done in Process-14
12 done in Process-15
13 done in Process-13
11 done in Process-12
10 done in Process-11
14 done in Process-14
15 done in Process-15
18 done in Process-11
16 done in Process-13
17 done in Process-12
19 done in Process-14
>>>
>>> for t in tasks:
... t.join()
>>> hyper.done(mux_id)
True
>>> hyper.destroy()
>>>
- class upathlib.multiplexer.Multiplexer[source]#
Bases:
Iterable
[Element
],Sized
Multiplexer is used to distribute data elements to multiple “workers” so that each element is obtained by exactly one worker.
Typically, the data element is small in size but each requires significant time to process by the worker. The data elements are “hyper parameters”.
The usage consists of two main parts:
1. In “coordinator” code, call
create_read_session()
to start a new “session”. Different sessions (at the same time or otherwise) are independent consumers of the data.Typically, this dataset, which is small and easy to create, is consumed only once. In this case, the coordinator code typically calls
new()
to create a new Multiplexer, then callscreate_read_session()
on it, and then manages to send the ID to workers.2. In “worker” code, use the ID that was returned by
create_read_session()
to instantiate a Multiplexer and iterate over it. In so doing, multiple workers will obtain the data elements collectively, i.e., each element is obtained by exactly one worker.- classmethod new(data: Iterable[Element], path: str | Path | Upath, *, tag: str | None = None)[source]#
- Parameters:
- data
The data elements that need to be distributed. The elements should be pickle-able.
- path
A directory where the data and any supporting info will be saved. The directory can be existent or non-existent. A sub-directory will be created under
path
to storage data and info about this particular multiplexer. The name of the subdirectory is a datetime string.tag
is appended to the sub-directory name to be more informative, if so desired.If
path
is in the cloud, then the workers can be on multiple machines, and in multiple threads or processes on each machine. Ifpath
is on the local disk, then the workers are in threads or processes on the same machine.However, there are no strong reasons to use this facility on a local machine.
Usually this class is used to distribute data to a cluster of machines, hence this path points to a location in a cloud storage that is supported by
upathlib
.
- __init__(mux_id: str, worker_id: str | None = None, timeout: int | float | None = None)[source]#
Create a
Multiplexer
object and use it to distribute the data elements that have been stored bynew()
.- Parameters:
- mux_id
The value that is returned by
create_read_session()
.- worker_id
A string representing the current worker (i.e. this instance). If missing, a default is constructed based on thread name and process name.
- property worker_id: str#
- create_read_session() str [source]#
Let’s say there is a “coordinator” and some “workers”; these are programs running in threads, processes, or distributed machines. The coordinator creates a new
Multiplexer
and calls this method to start a “session” to read (i.e. iterate over) the elements in this Multiplexer:mux = Multiplexer.new(range(1000), '/tmp/abc/mux/') mux_id = mux.create_read_session()
The
mux_id
is then provided to the workers, which will createMultiplexer
instances pointing to the same dataset and participating in the reading session that has just been started:mux = Multiplexer(mux_id) for x in mux: ...
The data that was provided to
new()
is split between the workers so that each data element will be obtained by exactly one worker.The returned value (the “mux ID”) encodes info about the location (“path”) of the data storage as well as the newly created read session. All workers that use the same ID participate in the same read session, i.e. the data elements will be split between them. There can be multiple, independent read sessions going on at the same time.
This call does not make the current Multiplexer object a participant in the read session just created. One has to use the returned value to create a new
Multiplexer
object to participate in the said read session. If the current object is already participating in a read session (an “old” session), making this call on the object does not change its role as a participant in the old session. This call merely creates a new read session but does not modify the current object.As a rule of thumb, an object created by
Multiplexer.new(data, ...)
is not a participant of any read session (even aftercreate_read_session()
is called on it subsequently). On the other hand, an object created byMultiplexer(mux_id, ...)
is participating in the read session that is identified bymux_id
.
- stat(mux_id: str | None = None) dict [source]#
Return status info of an ongoing read session.
This is often called in the “coordinator” code on the object that has had its
create_read_session()
called.mux_id
is the return ofcreate_read_session()
. Ifmux_id
isNone
, then this method is about the read session in which the current object is participating.
- done(mux_id: str | None = None) bool [source]#
Return whether the data iteration is finished.
This is often called in the “coordinator” code on the object that has had its
create_read_session()
called.mux_id
is the return ofcreate_read_session()
. Ifmux_id
isNone
, then this method is about the read session in which the current object is participating.