from __future__ import annotations
import contextlib
import datetime
import os
import os.path
import pathlib
import shutil
import sys
import time
from collections.abc import Iterator
from io import BufferedReader
import filelock
# `filelock` is also called `py-filelock`.
# Tried `fasteners` also. In one use case,
# `filelock` worked whereas `fasteners.InterprocessLock` failed.
#
# Other options to look into include
# `oslo.concurrency`, `pylocker`, `portalocker`.
from ._upath import FileInfo, LockAcquireError, LockReleaseError, Upath
# End user may want to do this:
# logging.getLogger("filelock").setLevel(logging.WARNING)
[docs]
class LocalUpath(Upath, os.PathLike):
_LOCK_POLL_INTERVAL_SECONDS = 0.03
[docs]
def __init__(self, *pathsegments: str):
"""
Create a path on the local file system.
Both POSIX and Windows platforms are supported.
``*pathsegments`` specify the path, either absolute or relative to the current
working directory. If missing, the constructed path is the current working directory.
This is passed to `pathlib.Path <https://docs.python.org/3/library/pathlib.html#pathlib.Path>`_.
"""
super().__init__(str(pathlib.Path(*pathsegments).absolute()))
self._lock = None
[docs]
def __fspath__(self) -> str:
"""
LocalUpath implements the `os.PathLike <https://docs.python.org/3/library/os.html#os.PathLike>`_ protocol,
hence a LocalUpath object can be used anywhere an object implementing
os.PathLike is accepted. For example, used with the builtin function
`open() <https://docs.python.org/3/library/functions.html#open>`_:
>>> p = LocalUpath('/tmp/test/data.txt')
>>> p.rmrf()
0
>>> p.write_text('abc')
>>> with open(p) as file:
... print(file.read())
abc
>>> p.rmrf()
1
"""
return self.path.__fspath__()
def __getstate__(self):
return None, super().__getstate__()
def __setstate__(self, data):
_, z1 = data
self._lock = None
super().__setstate__(z1)
@property
def path(self) -> pathlib.Path:
"""
Return the `pathlib.Path <https://docs.python.org/3/library/pathlib.html#pathlib.Path>`_ object
of the path.
"""
return pathlib.Path(self._path)
[docs]
def as_uri(self) -> str:
"""
Represent the path as a file URI.
On Linux, this is like 'file:///home/username/path/to/file'.
On Windows, this is like 'file:///C:/Users/username/path/to/file'.
"""
return self.path.as_uri()
[docs]
def is_dir(self) -> bool:
"""
Return whether the current path is a dir.
"""
return self.path.is_dir()
[docs]
def is_file(self) -> bool:
"""
Return whether the current path is a file.
"""
return self.path.is_file()
[docs]
def file_info(self) -> FileInfo | None:
"""
Return file info if the current path is a file;
otherwise return ``None``.
"""
if not self.is_file():
return None
st = self.path.stat()
return FileInfo(
ctime=st.st_ctime,
mtime=st.st_mtime,
time_created=datetime.datetime.fromtimestamp(st.st_ctime),
time_modified=datetime.datetime.fromtimestamp(st.st_mtime),
size=st.st_size,
details=st,
)
# If an existing file is written to again using `write_...`,
# then its `ctime` and `mtime` are both updated.
# My experiments showed that `ctime` and `mtime` are equal.
@property
def root(self) -> LocalUpath:
"""
Return a new path representing the root.
On Windows, this is the root on the same drive, like ``LocalUpath('C:\')``.
On Linux and Mac, this is ``LocalUpath('/')``.
"""
return self.__class__(self.path.root)
[docs]
def read_bytes(self) -> bytes:
"""
Read the content of the current file as bytes.
"""
try:
return self.path.read_bytes()
except (IsADirectoryError, FileNotFoundError) as e:
raise FileNotFoundError(f"No such file: '{self}'") from e
[docs]
def write_bytes(
self, data: bytes | BufferedReader, *, overwrite: bool = False
) -> None:
"""
Write the bytes ``data`` to the current file.
"""
if self.is_file():
if not overwrite:
raise FileExistsError(f"File exists: '{self}'")
self.parent.path.mkdir(exist_ok=True, parents=True)
try:
memoryview(
data
) # bytes-like object, such as bytes, bytearray, array.array, memoryview
except TypeError:
data = data.read() # file-like object, like BytesIO, that is at beginning
self.path.write_bytes(data)
# If `self` is an existing directory, will raise `IsADirectoryError`.
# If `self` is an existing file, will overwrite.
def _copy_file(self, target: Upath, *, overwrite: bool = False):
if isinstance(target, LocalUpath):
if not overwrite and target.is_file():
raise FileExistsError(f"File exists: '{target}'")
os.makedirs(target.parent, exist_ok=True)
# If `p` is a file and we try to `os.makedirs(p / 'subdir`)`,
# on Linux it raises `NotADirectoryError`;
# on Windows it raises `FileNotFoundError`.
shutil.copyfile(self.path, target.path)
# If target already exists, it will be overwritten.
else:
super()._copy_file(target, overwrite=overwrite)
[docs]
def remove_dir(self, **kwargs) -> int:
"""
Remove the current dir along with all its contents recursively.
"""
n = super().remove_dir(**kwargs)
if self.path.is_dir():
shutil.rmtree(self.path)
return n
[docs]
def remove_file(self) -> None:
"""Remove the current file."""
try:
self.path.unlink()
except PermissionError as e: # this happens on Windows if `self` is a dir.
if self.is_dir():
raise IsADirectoryError(f"Is a directory: '{self}'") from e
else:
raise
# On Linux, if `self` is a dir, `IsADirectoryError` will be raised.
[docs]
def rename_dir(
self,
target: str | LocalUpath,
*,
overwrite: bool = False,
quiet: bool = False,
concurrent: bool = True,
) -> LocalUpath:
"""Rename the current dir (i.e. ``self``) to ``target``.
``overwrite`` is applied file-wise. If there are
files under ``target`` that do not have counterparts under ``self``,
they are left untouched.
``quiet`` controls whether to print progress info.
Return the new path.
"""
if not self.is_dir():
raise FileNotFoundError(f"No such file: '{self}'")
if isinstance(target, LocalUpath):
target = target._path
target_ = self.parent / target
if target_ == self:
return self
if not quiet:
print(f"Renaming {self!r} to {target_!r}", file=sys.stderr)
self._copy_dir(
self,
target_,
"rename_file",
overwrite=overwrite,
quiet=quiet,
concurrent=concurrent,
)
def _remove_empty_dir(path):
k = 0
for p in path.iterdir():
if p.is_dir():
k += _remove_empty_dir(p)
else:
k += 1
if k == 0:
path.rmdir()
return k
_remove_empty_dir(self.path)
return target_
def _rename_file(self, target: str, *, overwrite=False):
target = self.parent / target
if not overwrite and target.is_file():
raise FileExistsError(f"File exists: '{target}'")
os.makedirs(target.parent, exist_ok=True)
self.path.rename(target.path)
[docs]
def rename_file(
self, target: str | LocalUpath, *, overwrite: bool = False
) -> LocalUpath:
"""Rename the current file (i.e. ``self``) to ``target`` in the same store.
``target`` is either absolute or relative to ``self.parent``.
For example, if ``self`` is '/a/b/c/d.txt', then
``target='e.txt'`` means '/a/b/c/e.txt'.
If ``overwrite`` is ``False`` (the default) and the target file exists,
``FileExistsError`` is raised.
Return the new path.
"""
if isinstance(target, LocalUpath):
target = target._path
target_ = self.parent / target
if target_ == self:
return self
self._rename_file(target_._path, overwrite=overwrite)
return target_
[docs]
def iterdir(self) -> Iterator[LocalUpath]:
"""
Yield the immediate children under the current dir.
"""
try:
for p in self.path.iterdir():
yield self / p.name
except (NotADirectoryError, FileNotFoundError):
pass
[docs]
def riterdir(self) -> Iterator[LocalUpath]:
"""
Yield all files under the current dir recursively.
"""
for p in self.iterdir():
if p.is_file():
yield p
elif p.is_dir():
yield from p.riterdir()
[docs]
@contextlib.contextmanager
def lock(self, *, timeout=None):
"""
This uses the package `filelock <https://github.com/tox-dev/py-filelock>`_ to implement
a file lock for inter-process communication.
.. note:: At the end, this file is not deleted. If it is purely a dummy file to implement locking
for other things, user may want to delete this file after use.
"""
if timeout is None:
timeout = 60
os.makedirs(self.parent, exist_ok=True)
lockfile = self.with_suffix(self.suffix + ".lock")
lock = filelock.FileLock(str(lockfile)) # this object manages re-entry itself
t0 = time.perf_counter()
try:
lock.acquire(
timeout=timeout, poll_interval=self._LOCK_POLL_INTERVAL_SECONDS
)
except Exception as e:
raise LockAcquireError(
f"Failed to lock '{self}' trying for {time.perf_counter() - t0:.2f} seconds; gave up on {e!r}"
) from e
try:
yield self
finally:
try:
# if lock.lock_counter == 1:
# lockfile.remove_file()
lock.release() # in a re-entry situation, this may not actually "release" the lock
# NOTE: the file is not deleted.
# The reason to not delete it is that at this moment the file could have been locked
# by another worker, while test showed that deletion would go through without issue.
except Exception as e:
raise LockReleaseError(
f"Failed to unlock '{self}'; gave up on {e!r}"
) from e
LocalPathType = str | pathlib.Path | LocalUpath