from __future__ import annotations
__lazy_modules__ = {"contextlib", "plumbum.commands", "random", "threading"}
import contextlib
import logging
import random
import threading
import time
import typing
from typing import IO, Any
from plumbum.commands import BaseCommand, run_proc
from plumbum.commands.processes import ProcessExecutionError
from plumbum.machines.base import PopenAddons
if typing.TYPE_CHECKING:
from collections.abc import Container
from plumbum._compat.typing import Self
from plumbum.machines.base import PopenWithAddons
from plumbum.machines.paramiko_machine import ParamikoPopen
[docs]
class ShellSessionError(Exception):
"""Raises when something goes wrong when calling
:func:`ShellSession.popen <plumbum.machines.session.ShellSession.popen>`"""
[docs]
class SSHCommsError(ProcessExecutionError, EOFError):
"""Raises when the communication channel can't be created on the
remote host or it times out."""
[docs]
class SSHCommsChannel2Error(SSHCommsError):
"""Raises when channel 2 (stderr) is not available"""
[docs]
class IncorrectLogin(SSHCommsError):
"""Raises when incorrect login credentials are provided"""
[docs]
class HostPublicKeyUnknown(SSHCommsError):
"""Raises when the host public key isn't known"""
shell_logger = logging.getLogger("plumbum.shell")
# ===================================================================================================
# Shell Session Popen
# ===================================================================================================
[docs]
class MarkedPipe:
"""A pipe-like object from which you can read lines; the pipe will return report EOF (the
empty string) when a special marker is detected"""
__slots__ = ["__weakref__", "marker", "pipe"]
[docs]
def __init__(self, pipe: IO[bytes], marker: str) -> None:
self.pipe: IO[bytes] | None = pipe
self.marker = bytes(marker, "ascii")
[docs]
def close(self) -> None:
"""'Closes' the marked pipe; following calls to ``readline`` will return "" """
# consume everything
while self.readline():
pass
self.pipe = None
[docs]
def readline(self) -> bytes:
"""Reads the next line from the pipe; returns "" when the special marker is reached.
Raises ``EOFError`` if the underlying pipe has closed"""
if self.pipe is None:
return b""
line = self.pipe.readline()
if not line:
raise EOFError()
if line.strip() == self.marker:
self.pipe = None
return b""
return line
[docs]
class SessionPopen(PopenAddons):
"""A shell-session-based ``Popen``-like object (has the following attributes: ``stdin``,
``stdout``, ``stderr``, ``returncode``)"""
returncode: int | None
[docs]
def __init__(
self,
proc: SessionPopen | PopenWithAddons[Any],
argv: bytes,
isatty: bool,
stdin: IO[bytes],
stdout: MarkedPipe,
stderr: MarkedPipe,
encoding: str,
*,
host: str | None,
) -> None:
self.host = host
self.proc: PopenWithAddons[Any] = proc # type: ignore[assignment]
self.argv = argv
self.isatty = isatty
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.custom_encoding = encoding
self.close_streams_after_communicate = False
self.returncode = None
self._done = False
[docs]
def poll(self) -> int | None:
"""Returns the process' exit code or ``None`` if it's still running"""
return self.returncode if self._done else None
[docs]
def wait(self) -> int:
"""Waits for the process to terminate and returns its exit code"""
self.communicate()
assert self.returncode is not None
return self.returncode
[docs]
def communicate(
self,
input: bytes | bytearray | None = None, # pylint: disable=redefined-builtin
) -> tuple[bytes, bytes]:
"""Consumes the process' stdout and stderr until the it terminates.
:param input: An optional bytes/buffer object to send to the process over stdin
:returns: A tuple of (stdout, stderr)
"""
stdout: list[bytes] = []
stderr: list[bytes] = []
sources = [("1", stdout, self.stdout)]
if not self.isatty:
# in tty mode, stdout and stderr are unified
sources.append(("2", stderr, self.stderr))
i = 0
while sources:
if input:
chunk = input[:1000]
self.stdin.write(chunk)
self.stdin.flush()
input = input[1000:]
i = (i + 1) % len(sources)
name, coll, pipe = sources[i]
try:
line = pipe.readline()
shell_logger.debug("%s> %r", name, line)
except EOFError as err:
shell_logger.debug("%s> Nothing returned.", name)
self.proc.poll()
returncode = self.proc.returncode
assert self.custom_encoding is not None
stdout_txt = b"".join(stdout).decode(self.custom_encoding, "ignore")
stderr_txt = b"".join(stderr).decode(self.custom_encoding, "ignore")
argv = self.argv.decode(self.custom_encoding, "ignore").split(";")[:1]
if returncode == 5:
raise IncorrectLogin(
argv,
returncode,
stdout_txt,
stderr_txt,
message="Incorrect username or password provided",
host=self.host,
) from None
if returncode == 6:
raise HostPublicKeyUnknown(
argv,
returncode,
stdout_txt,
stderr_txt,
message="The authenticity of the host can't be established",
host=self.host,
) from None
if returncode != 0:
raise SSHCommsError(
argv,
returncode,
stdout_txt,
stderr_txt,
message="SSH communication failed",
host=self.host,
) from None
if name == "2":
raise SSHCommsChannel2Error(
argv,
returncode,
stdout_txt,
stderr_txt,
message="No stderr result detected. Does the remote have Bash as the default shell?",
host=self.host,
) from None
raise SSHCommsError(
argv,
returncode,
stdout_txt,
stderr_txt,
message="No communication channel detected. Does the remote exist?",
host=self.host,
) from err
if not line:
del sources[i]
else:
coll.append(line)
if self.isatty:
stdout.pop(0) # discard first line of prompt
try:
self.returncode = int(stdout.pop(-1))
except (IndexError, ValueError):
self.returncode = "Unknown" # type: ignore[assignment]
self._done = True
stdout_bytes = b"".join(stdout)
stderr_bytes = b"".join(stderr)
return stdout_bytes, stderr_bytes
[docs]
class ShellSession:
"""An abstraction layer over *shell sessions*. A shell session is the execution of an
interactive shell (``/bin/sh`` or something compatible), over which you may run commands
(sent over stdin). The output of is then read from stdout and stderr. Shell sessions are
less "robust" than executing a process on its own, and they are susseptible to all sorts
of malformatted-strings attacks, and there is little benefit from using them locally.
However, they can greatly speed up remote connections, and are required for the implementation
of :class:`SshMachine <plumbum.machines.ssh_machine.SshMachine>`, as they allow us to send multiple
commands over a single SSH connection (setting up separate SSH connections incurs a high
overhead). Try to avoid using shell sessions, unless you know what you're doing.
Instances of this class may be used as *context-managers*.
:param proc: The underlying shell process (with open stdin, stdout and stderr)
:param encoding: The encoding to use for the shell session. If ``"auto"``, the underlying
process' encoding is used.
:param isatty: If true, assume the shell has a TTY and that stdout and stderr are unified
:param connect_timeout: The timeout to connect to the shell, after which, if no prompt
is seen, the shell process is killed
"""
__slots__ = (
"_current",
"_lock",
"_startup_result",
"custom_encoding",
"host",
"isatty",
"proc",
)
[docs]
def __init__(
self,
proc: PopenWithAddons[Any] | SessionPopen | ParamikoPopen,
encoding: str = "auto",
isatty: bool = False,
connect_timeout: float | None = 5,
*,
host: str | None = None,
) -> None:
self.host = host
self.proc: PopenWithAddons[Any] | None = proc # type: ignore[assignment]
self.custom_encoding: str | None = (
proc.custom_encoding if encoding == "auto" else encoding
)
self.isatty = isatty
self._lock = threading.RLock()
self._current: SessionPopen | None = None
self._startup_result: tuple[int | None, str, str] | None = None
if connect_timeout:
def closer() -> None:
shell_logger.error(
"Connection to %s timed out (%d sec)", proc, connect_timeout
)
self.close()
timer = threading.Timer(connect_timeout, closer)
timer.start()
try:
self._startup_result = self.run("")
finally:
if connect_timeout:
timer.cancel()
def __enter__(self) -> Self:
return self
def __exit__(self, t: object, v: object, tb: object) -> None:
self.close()
def __del__(self) -> None:
with contextlib.suppress(Exception):
self.close()
[docs]
def alive(self) -> bool:
"""Returns ``True`` if the underlying shell process is alive, ``False`` otherwise"""
return bool(self.proc and self.proc.poll() is None)
[docs]
def close(self) -> None:
"""Closes (terminates) the shell session"""
if not self.alive():
return
assert self.proc is not None
with contextlib.suppress(ValueError, OSError):
self.proc.stdin.write(b"\nexit\n\n\nexit\n\n") # type: ignore[union-attr]
self.proc.stdin.flush() # type: ignore[union-attr]
time.sleep(0.05)
for p in (self.proc.stdin, self.proc.stdout, self.proc.stderr):
with contextlib.suppress(Exception):
p.close() # type: ignore[union-attr]
with contextlib.suppress(OSError):
self.proc.kill()
self.proc = None
[docs]
def popen(self, cmd: str | BaseCommand) -> PopenWithAddons[Any]:
"""Runs the given command in the shell, adding some decoration around it. Only a single
command can be executed at any given time.
:param cmd: The command (string or :class:`Command <plumbum.commands.base.BaseCommand>` object)
to run
:returns: A :class:`SessionPopen <plumbum.machines.session.SessionPopen>` instance
"""
if self.proc is None:
raise ShellSessionError("Shell session has already been closed")
if self._current and not self._current._done:
raise ShellSessionError("Each shell may start only one process at a time")
full_cmd = " ".join(cmd.formulate(1)) if isinstance(cmd, BaseCommand) else cmd
marker = f"--.END{time.time() * random.random()}.--"
if full_cmd.strip():
full_cmd += " ; "
else:
full_cmd = "true ; "
full_cmd += f"echo $? ; echo '{marker}'"
if not self.isatty:
full_cmd += f" ; echo '{marker}' 1>&2"
if self.custom_encoding:
full_cmd_bytes = full_cmd.encode(self.custom_encoding)
else:
# TODO: I don't think bytes are supported above
full_cmd_bytes = full_cmd # type: ignore[assignment]
shell_logger.debug("Running %r", full_cmd)
assert self.proc.stdin is not None
self.proc.stdin.write(full_cmd_bytes + b"\n")
self.proc.stdin.flush()
assert self.proc.stdout is not None
assert self.proc.stderr is not None
assert self.custom_encoding is not None
_current = SessionPopen(
self.proc,
full_cmd_bytes,
self.isatty,
self.proc.stdin,
MarkedPipe(self.proc.stdout, marker),
MarkedPipe(self.proc.stderr, marker),
self.custom_encoding,
host=self.host,
)
self._current = _current
return _current # type: ignore[return-value]
[docs]
def run(
self, cmd: str | BaseCommand, retcode: int | None | Container[int] = 0
) -> tuple[int | None, str, str]:
"""Runs the given command
:param cmd: The command (string or :class:`Command <plumbum.commands.base.BaseCommand>` object)
to run
:param retcode: The expected return code (0 by default). Set to ``None`` in order to
ignore erroneous return codes
:returns: A tuple of (return code, stdout, stderr)
"""
with self._lock:
return run_proc(self.popen(cmd), retcode)
__all__ = [
"HostPublicKeyUnknown",
"IncorrectLogin",
"MarkedPipe",
"SSHCommsChannel2Error",
"SSHCommsError",
"SessionPopen",
"ShellSession",
"ShellSessionError",
]
def __dir__() -> list[str]:
return list(__all__)