Source code for plumbum.machines.ssh_machine

from __future__ import annotations

__lazy_modules__ = {
    "contextlib",
    "plumbum.commands",
    "plumbum.lib",
    "plumbum.machines.local",
    "plumbum.machines.session",
    "plumbum.path",
    "plumbum.path.local",
    "plumbum.path.remote",
    "re",
    "socket",
}

import re
import socket
from contextlib import closing
from typing import TYPE_CHECKING, Any

from plumbum.commands import BaseCommand, ProcessExecutionError, shquote
from plumbum.lib import IS_WIN32
from plumbum.machines.local import local
from plumbum.machines.remote import BaseRemoteMachine, RemoteEnv
from plumbum.machines.session import ShellSession
from plumbum.path.local import LocalPath
from plumbum.path.remote import RemotePath, RemoteWorkdir

if TYPE_CHECKING:
    from collections.abc import Sequence

    from plumbum._compat.typing import Self
    from plumbum.commands.async_ import AsyncRemoteCommand
    from plumbum.machines.base import PopenWithAddons


def _get_free_port() -> int:
    """Attempts to find a free port."""
    s = socket.socket()
    with closing(s):
        s.bind(("localhost", 0))
        port: int = s.getsockname()[1]
        return port


[docs] class SshTunnel: """An object representing an SSH tunnel (created by :func:`SshMachine.tunnel <plumbum.machines.ssh_machine.SshMachine.tunnel>`)""" __slots__ = ["__weakref__", "_dport", "_lport", "_reverse", "_session"]
[docs] def __init__( self, session: ShellSession, lport: int | str, dport: int | str, reverse: bool, ) -> None: self._session = session self._lport = lport self._dport = dport self._reverse = reverse if reverse and str(dport) == "0" and session._startup_result is not None: # Try to detect assigned remote port. regex = re.compile( r"^Allocated port (\d+) for remote forward to .+$", re.MULTILINE ) match = regex.search(str(session._startup_result[2])) if match: self._dport = match.group(1)
[docs] def __repr__(self) -> str: tunnel = self._session.proc if self._session.alive() else "(defunct)" return f"<SshTunnel {tunnel}>"
def __enter__(self) -> Self: return self def __exit__(self, t: object, v: object, tb: object) -> None: self.close()
[docs] def close(self) -> None: """Closes(terminates) the tunnel""" self._session.close()
@property def lport(self) -> int | str: """Tunneled port or socket on the local machine.""" return self._lport @property def dport(self) -> int | str: """Tunneled port or socket on the remote machine.""" return self._dport @property def reverse(self) -> bool: """Represents if the tunnel is a reverse tunnel.""" return self._reverse
[docs] class SshMachine(BaseRemoteMachine): """ An implementation of :class:`remote machine <plumbum.machines.remote.BaseRemoteMachine>` over SSH. Invoking a remote command translates to invoking it over SSH :: with SshMachine("yourhostname") as rem: r_ls = rem["ls"] # r_ls is the remote `ls` # executing r_ls() translates to `ssh yourhostname ls` :param host: the host name to connect to (SSH server) :param user: the user to connect as (if ``None``, the default will be used) :param port: the server's port (if ``None``, the default will be used) :param keyfile: the path to the identity file (if ``None``, the default will be used) :param ssh_command: the ``ssh`` command to use; this has to be a ``Command`` object; if ``None``, the default ssh client will be used. :param scp_command: the ``scp`` command to use; this has to be a ``Command`` object; if ``None``, the default scp program will be used. :param ssh_opts: any additional options for ``ssh`` (a list of strings) :param scp_opts: any additional options for ``scp`` (a list of strings) :param password: the password to use; requires ``sshpass`` be installed. Cannot be used in conjunction with ``ssh_command`` or ``scp_command`` (will be ignored). NOTE: THIS IS A SECURITY RISK! :param encoding: the remote machine's encoding (defaults to UTF8) :param connect_timeout: specify a connection timeout (the time until shell prompt is seen). The default is 10 seconds. Set to ``None`` to disable :param new_session: whether or not to start the background session as a new session leader (setsid). This will prevent it from being killed on Ctrl+C (SIGINT) """ __slots__ = ("_fqhost", "_scp_command", "_scp_translate", "_ssh_command", "host")
[docs] def __init__( self, host: str, user: str | None = None, port: int | None = None, keyfile: str | None = None, ssh_command: BaseCommand | None = None, scp_command: BaseCommand | None = None, ssh_opts: Sequence[str] = (), scp_opts: Sequence[str] = (), password: str | None = None, encoding: str = "utf8", connect_timeout: float | None = 10, new_session: bool = False, ) -> None: if ssh_command is None: if password is not None: ssh_command = local["sshpass"]["-p", password, "ssh"] else: ssh_command = local["ssh"] if scp_command is None: if password is not None: scp_command = local["sshpass"]["-p", password, "scp"] else: scp_command = local["scp"] scp_args = [] ssh_args = [] self.host = host if user: self._fqhost = f"{user}@{host}" else: self._fqhost = host if port: ssh_args.extend(["-p", str(port)]) scp_args.extend(["-P", str(port)]) if keyfile: ssh_args.extend(["-i", str(keyfile)]) scp_args.extend(["-i", str(keyfile)]) scp_args.append("-r") ssh_args.extend(ssh_opts) scp_args.extend(scp_opts) self._ssh_command = ssh_command[tuple(ssh_args)] self._scp_command = scp_command[tuple(scp_args)] self._scp_translate = IS_WIN32 and self._scp_uses_posix_paths(self._scp_command) BaseRemoteMachine.__init__( self, encoding=encoding, connect_timeout=connect_timeout, new_session=new_session, )
[docs] def __str__(self) -> str: return f"ssh://{self._fqhost}"
[docs] def popen( self, args: Sequence[str] | str, ssh_opts: Sequence[str] = (), env: dict[str, str] | None = None, cwd: str | LocalPath | None = None, **kwargs: Any, ) -> PopenWithAddons[str]: cmdline: list[str] = [] cmdline.extend(ssh_opts) cmdline.append(self._fqhost) if args: envdelta = {} if hasattr(self, "env"): envdelta.update(self.env.getdelta()) if env: envdelta.update(env) if cwd is None: cwd = getattr(self, "cwd", None) if cwd: cmdline.extend(["cd", str(cwd), "&&"]) if envdelta: cmdline.append("env") cmdline.extend(f"{k}={shquote(v)}" for k, v in envdelta.items()) if isinstance(args, (tuple, list)): cmdline.extend(args) else: cmdline.append(args) # type: ignore[arg-type] return self._ssh_command[tuple(cmdline)].popen(**kwargs)
[docs] def daemonic_popen( self, command: BaseCommand, cwd: str = ".", stdout: str | None = None, stderr: str | None = None, append: bool = True, ) -> PopenWithAddons[str]: """ Runs the given command using ``nohup`` and redirects std handles, allowing the command to run "detached" from its controlling TTY or parent. Does not return anything. .. versionadded:: 1.6.0 """ if stdout is None: stdout = "/dev/null" if stderr is None: stderr = "&1" args = [] if str(cwd) == "." else ["cd", str(cwd), "&&"] args.append("nohup") args.extend(command.formulate()) args.extend( [ (">>" if append else ">") + str(stdout), "2" + (">>" if (append and stderr != "&1") else ">") + str(stderr), "</dev/null", ] ) proc = self.popen(args, ssh_opts=["-f"]) rc = proc.wait() assert proc.stdin is not None assert proc.stdout is not None assert proc.stderr is not None try: if rc != 0: raise ProcessExecutionError( args, rc, proc.stdout.read(), proc.stderr.read() ) finally: proc.stdin.close() proc.stdout.close() proc.stderr.close() return proc
[docs] def session(self, isatty: bool = False, new_session: bool = False) -> ShellSession: return ShellSession( self.popen( ["/bin/sh"], (["-tt"] if isatty else ["-T"]), new_session=new_session ), self.custom_encoding, isatty, self.connect_timeout, host=self.host, )
[docs] def tunnel( self, lport: int | str, dport: int | str, lhost: str | None = "localhost", dhost: str | None = "localhost", connect_timeout: float | None = None, reverse: bool = False, ) -> SshTunnel: r"""Creates an SSH tunnel from the TCP port (``lport``) of the local machine (``lhost``, defaults to ``"localhost"``, but it can be any IP you can ``bind()``) to the remote TCP port (``dport``) of the destination machine (``dhost``, defaults to ``"localhost"``, which means *this remote machine*). This function also supports Unix sockets, in which case the local socket should be passed in as ``lport`` and the local bind address should be ``None``. The same can be done for a remote socket, by following the same pattern with ``dport`` and ``dhost``. The returned :class:`SshTunnel <plumbum.machines.ssh_machine.SshTunnel>` object can be used as a *context-manager*. The more conventional use case is the following:: +---------+ +---------+ | Your | | Remote | | Machine | | Machine | +----o----+ +---- ----+ | ^ | | lport dport | | \______SSH TUNNEL____/ (secure) Here, you wish to communicate safely between port ``lport`` of your machine and port ``dport`` of the remote machine. Communication is tunneled over SSH, so the connection is authenticated and encrypted. The more general case is shown below (where ``dport != "localhost"``):: +---------+ +-------------+ +-------------+ | Your | | Remote | | Destination | | Machine | | Machine | | Machine | +----o----+ +---- ----o---+ +---- --------+ | ^ | ^ | | | | lhost:lport | | dhost:dport | | | | \_____SSH TUNNEL_____/ \_____SOCKET____/ (secure) (not secure) Usage:: rem = SshMachine("megazord") with rem.tunnel(1234, "/var/lib/mysql/mysql.sock", dhost=None): sock = socket.socket() sock.connect(("localhost", 1234)) # sock is now tunneled to the MySQL socket on megazord The ``connect_timeout`` is the time to wait for the tunnel's shell prompt; if not given, the machine-level ``connect_timeout`` is used. """ formatted_lhost = "" if lhost is None else f"[{lhost}]:" formatted_dhost = "" if dhost is None else f"[{dhost}]:" if str(lport) == "0": lport = _get_free_port() ssh_opts = ( [ "-L", f"{formatted_lhost}{lport}:{formatted_dhost}{dport}", ] if not reverse else [ "-R", f"{formatted_dhost}{dport}:{formatted_lhost}{lport}", ] ) proc = self.popen((), ssh_opts=ssh_opts, new_session=True) if connect_timeout is None: connect_timeout = self.connect_timeout return SshTunnel( ShellSession(proc, self.custom_encoding, connect_timeout=connect_timeout), lport, dport, reverse, )
@staticmethod def _translate_drive_letter(path: str | LocalPath) -> str: # replace c:\some\path with /c/some/path path = str(path) if ":" in path: return "/" + path.replace(":", "").replace("\\", "/") return path @staticmethod def _scp_uses_posix_paths(scp_command: BaseCommand) -> bool: """Whether ``scp_command`` expects POSIX-style ``/c/...`` local paths. Cygwin- and MSYS2-based builds of ``scp`` (such as the one bundled with Git for Windows) cannot make sense of native ``c:\\...`` paths and need drive letters rewritten as ``/c/...``. They are recognisable by the ``cygwin1.dll`` / ``msys-2.0.dll`` runtime that sits next to the executable. The native Windows OpenSSH ``scp`` accepts drive-letter paths directly, so for it (and on POSIX hosts) no translation is needed. """ try: cmd: Any = scp_command while not hasattr(cmd, "executable"): cmd = getattr(cmd, "cmd", None) if cmd is None: return False folder = local.path(cmd.executable).dirname return any( (folder / dll).exists() for dll in ("cygwin1.dll", "msys-2.0.dll") ) except (AttributeError, OSError): return False
[docs] def download(self, src: str | RemotePath, dst: str | LocalPath) -> None: if isinstance(src, LocalPath): raise TypeError(f"src of download cannot be {src!r}") if isinstance(src, RemotePath) and src.remote != self: raise TypeError(f"src {src!r} points to a different remote machine") if isinstance(dst, RemotePath): raise TypeError(f"dst of download cannot be {dst!r}") if self._scp_translate: # only the local path (dst) is parsed by the local scp binary dst = self._translate_drive_letter(dst) self._scp_command(f"{self._fqhost}:{shquote(src)}", dst)
[docs] def upload(self, src: str | LocalPath, dst: str | RemotePath) -> None: if isinstance(src, RemotePath): raise TypeError(f"src of upload cannot be {src!r}") if isinstance(dst, LocalPath): raise TypeError(f"dst of upload cannot be {dst!r}") if isinstance(dst, RemotePath) and dst.remote != self: raise TypeError(f"dst {dst!r} points to a different remote machine") if self._scp_translate: # only the local path (src) is parsed by the local scp binary src = self._translate_drive_letter(src) self._scp_command(src, f"{self._fqhost}:{shquote(dst)}")
[docs] class PuttyMachine(SshMachine): """ PuTTY-flavored SSH connection. The programs ``plink`` and ``pscp`` are expected to be in the path (or you may provide your own ``ssh_command`` and ``scp_command``) Arguments are the same as for :class:`plumbum.machines.ssh_machine.SshMachine` """ __slots__ = ()
[docs] def __init__( self, host: str, user: str | None = None, port: int | None = None, keyfile: str | None = None, ssh_command: BaseCommand | None = None, scp_command: BaseCommand | None = None, ssh_opts: Sequence[str] = (), scp_opts: Sequence[str] = (), encoding: str = "utf8", connect_timeout: float | None = 10, new_session: bool = False, ) -> None: if ssh_command is None: ssh_command = local["plink"] if scp_command is None: scp_command = local["pscp"] if not ssh_opts: ssh_opts = ["-ssh"] if user is None: user = local.env.user if port is not None: ssh_opts = [*ssh_opts, "-P", str(port)] scp_opts = [*scp_opts, "-P", str(port)] port = None SshMachine.__init__( self, host, user, port, keyfile=keyfile, ssh_command=ssh_command, scp_command=scp_command, ssh_opts=ssh_opts, scp_opts=scp_opts, encoding=encoding, connect_timeout=connect_timeout, new_session=new_session, )
[docs] def __str__(self) -> str: return f"putty-ssh://{self._fqhost}"
@staticmethod def _translate_drive_letter(path: str | LocalPath) -> str: # pscp takes care of windows paths automatically return str(path)
[docs] def session(self, isatty: bool = False, new_session: bool = False) -> ShellSession: return ShellSession( self.popen((), (["-t"] if isatty else ["-T"]), new_session=new_session), self.custom_encoding, isatty, self.connect_timeout, )
[docs] class AsyncSshMachine: """Async version of SshMachine. This class provides async SSH command execution. It wraps a sync SshMachine and provides async execution methods. Example:: from plumbum.machines.ssh_machine import AsyncSshMachine async with AsyncSshMachine("hostname") as rem: result = await rem["ls"]("-la") # Concurrent execution results = await asyncio.gather( rem["echo"]("task1"), rem["echo"]("task2"), ) .. versionadded:: 2.0 """ __slots__ = ("_sync_machine",)
[docs] def __init__( self, host: str, user: str | None = None, port: int | None = None, keyfile: str | None = None, ssh_command: BaseCommand | None = None, scp_command: BaseCommand | None = None, ssh_opts: Sequence[str] = (), scp_opts: Sequence[str] = (), password: str | None = None, encoding: str = "utf8", connect_timeout: float | None = 10, new_session: bool = False, ) -> None: """Initialize async SSH machine. Args: host: The host name to connect to (SSH server) user: The user to connect as (if None, the default will be used) port: The server's port (if None, the default will be used) keyfile: The path to the identity file (if None, the default will be used) ssh_command: The ssh command to use (if None, the default will be used) scp_command: The scp command to use (if None, the default will be used) ssh_opts: Any additional options for ssh (a list of strings) scp_opts: Any additional options for scp (a list of strings) password: The password to use (requires sshpass) encoding: The remote machine's encoding (defaults to UTF8) connect_timeout: Connection timeout (default 10 seconds) new_session: Whether to start as a new session leader """ sync_machine = SshMachine( host=host, user=user, port=port, keyfile=keyfile, ssh_command=ssh_command, scp_command=scp_command, ssh_opts=ssh_opts, scp_opts=scp_opts, password=password, encoding=encoding, connect_timeout=connect_timeout, new_session=new_session, ) self._sync_machine = sync_machine
[docs] def __getitem__(self, cmd: str | RemotePath | LocalPath) -> AsyncRemoteCommand: """Get an async remote command by name or path.""" from plumbum.commands.async_ import AsyncRemoteCommand sync_cmd = self._sync_machine[cmd] return AsyncRemoteCommand(sync_cmd)
[docs] def __contains__(self, cmd: str) -> bool: """Check if a command exists in remote PATH.""" return cmd in self._sync_machine
@property def cwd(self) -> RemoteWorkdir: """Current working directory on remote machine.""" return self._sync_machine.cwd @property def env(self) -> RemoteEnv: """Environment variables on remote machine.""" return self._sync_machine.env
[docs] def path(self, *parts: str | RemotePath | LocalPath) -> RemotePath: """Create a RemotePath from parts.""" return self._sync_machine.path(*parts)
[docs] def close(self) -> None: """Close the connection to the remote machine.""" self._sync_machine.close()
[docs] async def __aenter__(self) -> Self: """Async context manager entry.""" return self
[docs] async def __aexit__(self, t: object, v: object, tb: object) -> None: """Async context manager exit.""" self.close()
__all__ = [ "AsyncSshMachine", "PuttyMachine", "SshMachine", "SshTunnel", ] def __dir__() -> list[str]: return list(__all__)