Source code for plumbum.machines.session

import contextlib
import logging
import random
import threading
import time

from plumbum.commands import BaseCommand, run_proc
from plumbum.commands.processes import ProcessExecutionError
from plumbum.machines.base import PopenAddons


[docs]class ShellSessionError(Exception): """Raises when something goes wrong when calling :func:`ShellSession.popen <plumbum.session.ShellSession.popen>`"""
[docs]class SSHCommsError(ProcessExecutionError, EOFError): """Raises when the communication channel can't be created on the remote host or it times out."""
[docs]class SSHCommsChannel2Error(SSHCommsError): """Raises when channel 2 (stderr) is not available"""
[docs]class IncorrectLogin(SSHCommsError): """Raises when incorrect login credentials are provided"""
[docs]class HostPublicKeyUnknown(SSHCommsError): """Raises when the host public key isn't known"""
shell_logger = logging.getLogger("plumbum.shell") # =================================================================================================== # Shell Session Popen # ===================================================================================================
[docs]class MarkedPipe: """A pipe-like object from which you can read lines; the pipe will return report EOF (the empty string) when a special marker is detected""" __slots__ = ["pipe", "marker", "__weakref__"]
[docs] def __init__(self, pipe, marker): self.pipe = pipe self.marker = marker self.marker = bytes(self.marker, "ascii")
[docs] def close(self): """'Closes' the marked pipe; following calls to ``readline`` will return "" """ # consume everything while self.readline(): pass self.pipe = None
[docs] def readline(self): """Reads the next line from the pipe; returns "" when the special marker is reached. Raises ``EOFError`` if the underlying pipe has closed""" if self.pipe is None: return b"" line = self.pipe.readline() if not line: raise EOFError() if line.strip() == self.marker: self.pipe = None return b"" return line
[docs]class SessionPopen(PopenAddons): """A shell-session-based ``Popen``-like object (has the following attributes: ``stdin``, ``stdout``, ``stderr``, ``returncode``)"""
[docs] def __init__(self, proc, argv, isatty, stdin, stdout, stderr, encoding, *, host): self.host = host self.proc = proc self.argv = argv self.isatty = isatty self.stdin = stdin self.stdout = stdout self.stderr = stderr self.custom_encoding = encoding self.returncode = None self._done = False
[docs] def poll(self): """Returns the process' exit code or ``None`` if it's still running""" return self.returncode if self._done else None
[docs] def wait(self): """Waits for the process to terminate and returns its exit code""" self.communicate() return self.returncode
[docs] def communicate(self, input=None): # pylint: disable=redefined-builtin """Consumes the process' stdout and stderr until the it terminates. :param input: An optional bytes/buffer object to send to the process over stdin :returns: A tuple of (stdout, stderr) """ stdout = [] stderr = [] sources = [("1", stdout, self.stdout)] if not self.isatty: # in tty mode, stdout and stderr are unified sources.append(("2", stderr, self.stderr)) i = 0 while sources: if input: chunk = input[:1000] self.stdin.write(chunk) self.stdin.flush() input = input[1000:] i = (i + 1) % len(sources) name, coll, pipe = sources[i] try: line = pipe.readline() shell_logger.debug("%s> %r", name, line) except EOFError as err: shell_logger.debug("%s> Nothing returned.", name) self.proc.poll() returncode = self.proc.returncode stdout = b"".join(stdout).decode(self.custom_encoding, "ignore") stderr = b"".join(stderr).decode(self.custom_encoding, "ignore") argv = self.argv.decode(self.custom_encoding, "ignore").split(";")[:1] if returncode == 5: raise IncorrectLogin( argv, returncode, stdout, stderr, message="Incorrect username or password provided", host=self.host, ) from None if returncode == 6: raise HostPublicKeyUnknown( argv, returncode, stdout, stderr, message="The authenticity of the host can't be established", host=self.host, ) from None if returncode != 0: raise SSHCommsError( argv, returncode, stdout, stderr, message="SSH communication failed", host=self.host, ) from None if name == "2": raise SSHCommsChannel2Error( argv, returncode, stdout, stderr, message="No stderr result detected. Does the remote have Bash as the default shell?", host=self.host, ) from None raise SSHCommsError( argv, returncode, stdout, stderr, message="No communication channel detected. Does the remote exist?", host=self.host, ) from err if not line: del sources[i] else: coll.append(line) if self.isatty: stdout.pop(0) # discard first line of prompt try: self.returncode = int(stdout.pop(-1)) except (IndexError, ValueError): self.returncode = "Unknown" self._done = True stdout = b"".join(stdout) stderr = b"".join(stderr) return stdout, stderr
[docs]class ShellSession: """An abstraction layer over *shell sessions*. A shell session is the execution of an interactive shell (``/bin/sh`` or something compatible), over which you may run commands (sent over stdin). The output of is then read from stdout and stderr. Shell sessions are less "robust" than executing a process on its own, and they are susseptible to all sorts of malformatted-strings attacks, and there is little benefit from using them locally. However, they can greatly speed up remote connections, and are required for the implementation of :class:`SshMachine <plumbum.machines.remote.SshMachine>`, as they allow us to send multiple commands over a single SSH connection (setting up separate SSH connections incurs a high overhead). Try to avoid using shell sessions, unless you know what you're doing. Instances of this class may be used as *context-managers*. :param proc: The underlying shell process (with open stdin, stdout and stderr) :param encoding: The encoding to use for the shell session. If ``"auto"``, the underlying process' encoding is used. :param isatty: If true, assume the shell has a TTY and that stdout and stderr are unified :param connect_timeout: The timeout to connect to the shell, after which, if no prompt is seen, the shell process is killed """
[docs] def __init__( self, proc, encoding="auto", isatty=False, connect_timeout=5, *, host=None ): self.host = host self.proc = proc self.custom_encoding = proc.custom_encoding if encoding == "auto" else encoding self.isatty = isatty self._lock = threading.RLock() self._current = None self._startup_result = None if connect_timeout: def closer(): shell_logger.error( "Connection to %s timed out (%d sec)", proc, connect_timeout ) self.close() timer = threading.Timer(connect_timeout, closer) timer.start() try: self._startup_result = self.run("") finally: if connect_timeout: timer.cancel()
def __enter__(self): return self def __exit__(self, t, v, tb): self.close() def __del__(self): with contextlib.suppress(Exception): self.close()
[docs] def alive(self): """Returns ``True`` if the underlying shell process is alive, ``False`` otherwise""" return self.proc and self.proc.poll() is None
[docs] def close(self): """Closes (terminates) the shell session""" if not self.alive(): return with contextlib.suppress(ValueError, OSError): self.proc.stdin.write(b"\nexit\n\n\nexit\n\n") self.proc.stdin.flush() time.sleep(0.05) for p in (self.proc.stdin, self.proc.stdout, self.proc.stderr): with contextlib.suppress(Exception): p.close() with contextlib.suppress(OSError): self.proc.kill() self.proc = None
[docs] def popen(self, cmd): """Runs the given command in the shell, adding some decoration around it. Only a single command can be executed at any given time. :param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object) to run :returns: A :class:`SessionPopen <plumbum.session.SessionPopen>` instance """ if self.proc is None: raise ShellSessionError("Shell session has already been closed") if self._current and not self._current._done: raise ShellSessionError("Each shell may start only one process at a time") full_cmd = cmd.formulate(1) if isinstance(cmd, BaseCommand) else cmd marker = f"--.END{time.time() * random.random()}.--" if full_cmd.strip(): full_cmd += " ; " else: full_cmd = "true ; " full_cmd += f"echo $? ; echo '{marker}'" if not self.isatty: full_cmd += f" ; echo '{marker}' 1>&2" if self.custom_encoding: full_cmd = full_cmd.encode(self.custom_encoding) shell_logger.debug("Running %r", full_cmd) self.proc.stdin.write(full_cmd + b"\n") self.proc.stdin.flush() self._current = SessionPopen( self.proc, full_cmd, self.isatty, self.proc.stdin, MarkedPipe(self.proc.stdout, marker), MarkedPipe(self.proc.stderr, marker), self.custom_encoding, host=self.host, ) return self._current
[docs] def run(self, cmd, retcode=0): """Runs the given command :param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object) to run :param retcode: The expected return code (0 by default). Set to ``None`` in order to ignore erroneous return codes :returns: A tuple of (return code, stdout, stderr) """ with self._lock: return run_proc(self.popen(cmd), retcode)