from __future__ import annotations
__lazy_modules__ = {
"contextlib",
"copy",
"plumbum.commands",
"urllib",
"urllib.request",
}
import copy
import errno
import os
import typing
import urllib.request as urllib
from contextlib import contextmanager
from plumbum.commands import ProcessExecutionError, shquote
from plumbum.path.base import FSUser, Path
if typing.TYPE_CHECKING:
import builtins
from collections.abc import Generator, Iterable
from plumbum._compat.typing import Self
from plumbum.machines.remote import BaseRemoteMachine
else:
BaseRemoteMachine = typing.Any
Self = typing.Any
[docs]
class StatRes:
"""POSIX-like stat result"""
__slots__ = ("_tup",)
_tup: tuple[int, int, int, int, int, int, int, float, float, float]
[docs]
def __init__(
self, tup: tuple[int, int, int, int, int, int, int, float, float, float]
):
self._tup = copy.copy(tup)
def __getitem__(self, index: int) -> int | float:
return self._tup[index]
@property
def st_mode(self) -> int:
return self._tup[0]
@property
def st_ino(self) -> int:
return self._tup[1]
@property
def st_dev(self) -> int:
return self._tup[2]
@property
def st_nlink(self) -> int:
return self._tup[3]
@property
def st_uid(self) -> int:
return self._tup[4]
@property
def st_gid(self) -> int:
return self._tup[5]
@property
def st_size(self) -> int:
return self._tup[6]
@property
def st_atime(self) -> float:
return self._tup[7]
@property
def st_mtime(self) -> float:
return self._tup[8]
@property
def st_ctime(self) -> float:
return self._tup[9]
# Aliases for backward compatibility / convenience
mode = st_mode
ino = st_ino
dev = st_dev
nlink = st_nlink
uid = st_uid
gid = st_gid
size = st_size
atime = st_atime
mtime = st_mtime
ctime = st_ctime
[docs]
class RemoteStatRes(StatRes):
"""Remote POSIX-like stat result"""
__slots__ = ("text_mode",)
text_mode: str # e.g., "directory", "regular file", etc.
[docs]
class RemotePath(Path):
"""The class implementing remote-machine paths"""
__slots__ = ("CASE_SENSITIVE", "remote")
remote: BaseRemoteMachine
[docs]
def __new__(cls, remote: BaseRemoteMachine, *parts_str: str | Path) -> Self:
if not parts_str:
raise TypeError("At least one path part is required (none given)")
windows = remote.uname.lower() == "windows"
normed: list[str] = []
parts: tuple[str, ...] = tuple(
map(str, parts_str)
) # force the paths into string, so subscription works properly
# Simple skip if path is absolute
if parts[0] and parts[0][0] not in ("/", "\\"):
cwd = (
remote._cwd
if hasattr(remote, "_cwd")
else remote._session.run("pwd")[1].strip()
)
parts = (cwd, *parts)
for p in parts:
if windows:
plist = str(p).replace("\\", "/").split("/")
else:
plist = str(p).split("/")
if not plist[0]:
plist.pop(0)
del normed[:]
for item in plist:
if item in {"", "."}:
continue
if item == "..":
if normed:
normed.pop(-1)
else:
normed.append(item)
if windows:
self = super().__new__(cls, "\\".join(normed))
self.CASE_SENSITIVE = False # On this object only
else:
self = super().__new__(cls, "/" + "/".join(normed))
self.CASE_SENSITIVE = True
self.remote = remote
return self
def _form(self, *parts: str | Path) -> RemotePath:
return RemotePath(self.remote, *parts)
@property
def _path(self) -> str:
return str(self)
@property
def name(self) -> str:
if "/" not in str(self):
return str(self)
return str(self).rsplit("/", 1)[1]
@property
def dirname(self) -> RemotePath | str: # type: ignore[override]
if "/" not in str(self):
return str(self)
return self.__class__(self.remote, str(self).rsplit("/", 1)[0])
@property
def suffix(self) -> str:
return os.path.splitext(self.name)[1]
@property
def suffixes(self) -> list[str]:
exts = []
name = self.name
while True:
name, ext = os.path.splitext(name)
if ext:
exts.append(ext)
else:
return list(reversed(exts))
@property
def uid(self) -> FSUser:
uid, name = self.remote._path_getuid(self)
return FSUser(int(uid), name)
@property
def gid(self) -> FSUser:
gid, name = self.remote._path_getgid(self)
return FSUser(int(gid), name)
def _get_info(self) -> tuple[BaseRemoteMachine, str]: # type: ignore[override]
return (self.remote, self._path)
[docs]
def join(self, *parts: str) -> RemotePath: # type: ignore[override]
return RemotePath(self.remote, self, *parts)
[docs]
def list(self) -> list[RemotePath]:
if not self.is_dir():
return []
return [self.join(fn) for fn in self.remote._path_listdir(self)]
[docs]
def iterdir(self) -> Iterable[RemotePath]:
if not self.is_dir():
return ()
return (self.join(fn) for fn in self.remote._path_listdir(self))
[docs]
def is_dir(self) -> bool:
res = self.remote._path_stat(self)
if not res:
return False
return res.text_mode == "directory"
[docs]
def is_file(self) -> bool:
res = self.remote._path_stat(self)
if not res:
return False
return res.text_mode in ("regular file", "regular empty file")
[docs]
def is_symlink(self) -> bool:
res = self.remote._path_stat(self)
if not res:
return False
return res.text_mode == "symbolic link"
[docs]
def exists(self) -> bool:
return self.remote._path_stat(self) is not None
[docs]
def stat(self) -> RemoteStatRes: # type: ignore[override]
res = self.remote._path_stat(self)
if res is None:
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), "")
return res
[docs]
def with_name(self, name: str) -> RemotePath:
return self.__class__(self.remote, self.dirname) / name
[docs]
def with_suffix(self, suffix: str, depth: int | None = 1) -> RemotePath:
if (suffix and not suffix.startswith(".")) or suffix == ".":
raise ValueError(f"Invalid suffix {suffix!r}")
name = self.name
depth = len(self.suffixes) if depth is None else min(depth, len(self.suffixes))
for _ in range(depth):
name, _ = name.rsplit(".", 1)
return self.__class__(self.remote, self.dirname) / (name + suffix)
[docs]
def glob(self, pattern: str) -> builtins.list[RemotePath]:
return self._glob(
pattern,
lambda pat: [
RemotePath(self.remote, m) for m in self.remote._path_glob(self, pat)
],
)
[docs]
def delete(self) -> None:
if not self.exists():
return
self.remote._path_delete(self)
unlink = delete
[docs]
def move(self, dst: RemotePath | str) -> RemotePath:
if isinstance(dst, RemotePath):
if dst.remote is not self.remote:
raise TypeError("dst points to a different remote machine")
elif not isinstance(dst, str):
raise TypeError(
f"dst must be a string or a RemotePath (to the same remote machine), got {dst!r}"
)
return self.remote._path_move(self, dst)
[docs]
def copy(self, dst: RemotePath | str, override: bool | None = False) -> RemotePath:
if isinstance(dst, RemotePath):
if dst.remote is not self.remote:
raise TypeError("dst points to a different remote machine")
elif not isinstance(dst, str):
raise TypeError(
f"dst must be a string or a RemotePath (to the same remote machine), got {dst!r}"
)
if override:
if isinstance(dst, str):
dst = RemotePath(self.remote, dst)
dst.delete()
else:
if isinstance(dst, str):
dst = RemotePath(self.remote, dst)
if dst.exists():
raise TypeError("Override not specified and dst exists")
return self.remote._path_copy(self, dst)
[docs]
def mkdir(
self, mode: int | None = None, parents: bool = True, exist_ok: bool = True
) -> None:
if parents and exist_ok:
self.remote._path_mkdir(self, mode=mode, minus_p=True)
else:
if parents and len(self.parts) > 1:
self.remote._path_mkdir(self.parent, mode=mode, minus_p=True)
try:
self.remote._path_mkdir(self, mode=mode, minus_p=False)
except ProcessExecutionError as ex:
if "File exists" not in ex.stderr:
raise
if not exist_ok:
raise OSError(
errno.EEXIST, "File exists (on remote end)", str(self)
) from None
[docs]
def read(self, encoding: str | None = None) -> str | bytes:
data = self.remote._path_read(self)
if encoding:
return data.decode(encoding)
return data
[docs]
def write(self, data: str | bytes, encoding: str | None = None) -> None:
if encoding:
assert isinstance(data, str)
data = data.encode(encoding)
self.remote._path_write(self, data)
[docs]
def touch(self) -> None:
self.remote._path_touch(str(self))
[docs]
def chown(
self,
owner: int | str | None = None,
group: int | str | None = None,
recursive: bool | None = None,
) -> None:
self.remote._path_chown(
self, owner, group, self.is_dir() if recursive is None else recursive
)
[docs]
def chmod(self, mode: int) -> None:
self.remote._path_chmod(mode, self)
[docs]
def access(self, mode: int | str = 0) -> bool:
mode = self._access_mode_to_flags(mode)
res = self.remote._path_stat(self)
if res is None:
return False
mask = res.st_mode & 0x1FF
return bool((mask >> 6) & mode) or bool((mask >> 3) & mode)
[docs]
def link(self, dst: RemotePath | str) -> None:
if isinstance(dst, RemotePath):
if dst.remote is not self.remote:
raise TypeError("dst points to a different remote machine")
elif not isinstance(dst, str):
raise TypeError(
f"dst must be a string or a RemotePath (to the same remote machine), got {dst!r}"
)
self.remote._path_link(self, dst, False)
[docs]
def symlink(self, dst: RemotePath | str) -> None:
if isinstance(dst, RemotePath):
if dst.remote is not self.remote:
raise TypeError("dst points to a different remote machine")
elif not isinstance(dst, str):
raise TypeError(
f"dst must be a string or a RemotePath (to the same remote machine), got {dst!r}"
)
self.remote._path_link(self, dst, True)
[docs]
def open(
self, mode: str = "r", bufsize: int = -1, *, encoding: str | None = None
) -> typing.IO[str] | typing.IO[bytes]:
"""
Opens this path as a file.
Only works for ParamikoMachine-associated paths for now.
"""
if encoding is not None:
raise NotImplementedError(
"encoding not supported for ParamikoMachine paths"
)
if hasattr(self.remote, "sftp") and hasattr(self.remote.sftp, "open"):
return self.remote.sftp.open(self, mode, bufsize) # type: ignore[no-any-return]
raise NotImplementedError(
"RemotePath.open only works for ParamikoMachine-associated paths for now"
)
[docs]
def as_uri(self, scheme: str = "ssh") -> str:
suffix = urllib.pathname2url(str(self))
# TODO: BaseRemoteMachine doesn't have this, but maybe just because it's not typed yet
return f"{scheme}://{self.remote._fqhost}{suffix}" # type: ignore[attr-defined]
@property
def stem(self) -> str:
return os.path.splitext(self.name)[0]
@property
def root(self) -> str:
return "/"
@property
def drive(self) -> str:
return ""
[docs]
class RemoteWorkdir(RemotePath):
"""Remote working directory manipulator"""
__slots__ = ()
[docs]
def __new__(cls, remote: BaseRemoteMachine) -> Self:
return super().__new__(cls, remote, remote._session.run("pwd")[1].strip())
__hash__ = None # type: ignore[assignment]
[docs]
def chdir(self, newdir: RemotePath | str) -> Self:
"""Changes the current working directory to the given one"""
self.remote._session.run(f"cd {shquote(newdir)}")
if hasattr(self.remote, "_cwd"):
del self.remote._cwd
return self.__class__(self.remote)
[docs]
def getpath(self) -> RemotePath:
"""Returns the current working directory as a
`remote path <plumbum.path.remote.RemotePath>` object"""
return RemotePath(self.remote, self)
[docs]
@contextmanager
def __call__(self, newdir: RemotePath | str) -> Generator[RemotePath, None, None]:
"""A context manager used to ``chdir`` into a directory and then ``chdir`` back to
the previous location; much like ``pushd``/``popd``.
:param newdir: The destination director (a string or a
:class:`RemotePath <plumbum.path.remote.RemotePath>`)
"""
prev = self._path
changed_dir = self.chdir(newdir)
try:
yield changed_dir
finally:
self.chdir(prev)
__all__ = [
"RemotePath",
"RemoteStatRes",
"RemoteWorkdir",
"StatRes",
]
def __dir__() -> list[str]:
return list(__all__)