import re
import socket
import warnings
from contextlib import closing
from plumbum.commands import ProcessExecutionError, shquote
from plumbum.lib import IS_WIN32
from plumbum.machines.local import local
from plumbum.machines.remote import BaseRemoteMachine
from plumbum.machines.session import ShellSession
from plumbum.path.local import LocalPath
from plumbum.path.remote import RemotePath
def _get_free_port():
"""Attempts to find a free port."""
s = socket.socket()
with closing(s):
s.bind(("localhost", 0))
return s.getsockname()[1]
[docs]class SshTunnel:
"""An object representing an SSH tunnel (created by
:func:`SshMachine.tunnel <plumbum.machines.remote.SshMachine.tunnel>`)"""
__slots__ = ["_session", "_lport", "_dport", "_reverse", "__weakref__"]
[docs] def __init__(self, session, lport, dport, reverse):
self._session = session
self._lport = lport
self._dport = dport
self._reverse = reverse
if reverse and str(dport) == "0" and session._startup_result is not None:
# Try to detect assigned remote port.
regex = re.compile(
r"^Allocated port (\d+) for remote forward to .+$", re.MULTILINE
)
match = regex.search(session._startup_result[2])
if match:
self._dport = match.group(1)
[docs] def __repr__(self):
tunnel = self._session.proc if self._session.alive() else "(defunct)"
return f"<SshTunnel {tunnel}>"
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
[docs] def close(self):
"""Closes(terminates) the tunnel"""
self._session.close()
@property
def lport(self):
"""Tunneled port or socket on the local machine."""
return self._lport
@property
def dport(self):
"""Tunneled port or socket on the remote machine."""
return self._dport
@property
def reverse(self):
"""Represents if the tunnel is a reverse tunnel."""
return self._reverse
[docs]class SshMachine(BaseRemoteMachine):
"""
An implementation of :class:`remote machine <plumbum.machines.remote.BaseRemoteMachine>`
over SSH. Invoking a remote command translates to invoking it over SSH ::
with SshMachine("yourhostname") as rem:
r_ls = rem["ls"]
# r_ls is the remote `ls`
# executing r_ls() translates to `ssh yourhostname ls`
:param host: the host name to connect to (SSH server)
:param user: the user to connect as (if ``None``, the default will be used)
:param port: the server's port (if ``None``, the default will be used)
:param keyfile: the path to the identity file (if ``None``, the default will be used)
:param ssh_command: the ``ssh`` command to use; this has to be a ``Command`` object;
if ``None``, the default ssh client will be used.
:param scp_command: the ``scp`` command to use; this has to be a ``Command`` object;
if ``None``, the default scp program will be used.
:param ssh_opts: any additional options for ``ssh`` (a list of strings)
:param scp_opts: any additional options for ``scp`` (a list of strings)
:param password: the password to use; requires ``sshpass`` be installed. Cannot be used
in conjunction with ``ssh_command`` or ``scp_command`` (will be ignored).
NOTE: THIS IS A SECURITY RISK!
:param encoding: the remote machine's encoding (defaults to UTF8)
:param connect_timeout: specify a connection timeout (the time until shell prompt is seen).
The default is 10 seconds. Set to ``None`` to disable
:param new_session: whether or not to start the background session as a new
session leader (setsid). This will prevent it from being killed on
Ctrl+C (SIGINT)
"""
[docs] def __init__(
self,
host,
user=None,
port=None,
keyfile=None,
ssh_command=None,
scp_command=None,
ssh_opts=(),
scp_opts=(),
password=None,
encoding="utf8",
connect_timeout=10,
new_session=False,
):
if ssh_command is None:
if password is not None:
ssh_command = local["sshpass"]["-p", password, "ssh"]
else:
ssh_command = local["ssh"]
if scp_command is None:
if password is not None:
scp_command = local["sshpass"]["-p", password, "scp"]
else:
scp_command = local["scp"]
scp_args = []
ssh_args = []
self.host = host
if user:
self._fqhost = f"{user}@{host}"
else:
self._fqhost = host
if port:
ssh_args.extend(["-p", str(port)])
scp_args.extend(["-P", str(port)])
if keyfile:
ssh_args.extend(["-i", str(keyfile)])
scp_args.extend(["-i", str(keyfile)])
scp_args.append("-r")
ssh_args.extend(ssh_opts)
scp_args.extend(scp_opts)
self._ssh_command = ssh_command[tuple(ssh_args)]
self._scp_command = scp_command[tuple(scp_args)]
BaseRemoteMachine.__init__(
self,
encoding=encoding,
connect_timeout=connect_timeout,
new_session=new_session,
)
[docs] def __str__(self):
return f"ssh://{self._fqhost}"
[docs] def popen(self, args, ssh_opts=(), env=None, cwd=None, **kwargs):
cmdline = []
cmdline.extend(ssh_opts)
cmdline.append(self._fqhost)
if args:
envdelta = {}
if hasattr(self, "env"):
envdelta.update(self.env.getdelta())
if env:
envdelta.update(env)
if cwd is None:
cwd = getattr(self, "cwd", None)
if cwd:
cmdline.extend(["cd", str(cwd), "&&"])
if envdelta:
cmdline.append("env")
cmdline.extend(f"{k}={shquote(v)}" for k, v in envdelta.items())
if isinstance(args, (tuple, list)):
cmdline.extend(args)
else:
cmdline.append(args)
return self._ssh_command[tuple(cmdline)].popen(**kwargs)
[docs] def nohup(self, command):
"""
Runs the given command using ``nohup`` and redirects std handles,
allowing the command to run "detached" from its controlling TTY or parent.
Does not return anything. Depreciated (use command.nohup or daemonic_popen).
"""
warnings.warn(
"Use .nohup on the command or use daemonic_popen)",
FutureWarning,
stacklevel=2,
)
self.daemonic_popen(command, cwd=".", stdout=None, stderr=None, append=False)
[docs] def daemonic_popen(self, command, cwd=".", stdout=None, stderr=None, append=True):
"""
Runs the given command using ``nohup`` and redirects std handles,
allowing the command to run "detached" from its controlling TTY or parent.
Does not return anything.
.. versionadded:: 1.6.0
"""
if stdout is None:
stdout = "/dev/null"
if stderr is None:
stderr = "&1"
args = [] if str(cwd) == "." else ["cd", str(cwd), "&&"]
args.append("nohup")
args.extend(command.formulate())
args.extend(
[
(">>" if append else ">") + str(stdout),
"2" + (">>" if (append and stderr != "&1") else ">") + str(stderr),
"</dev/null",
]
)
proc = self.popen(args, ssh_opts=["-f"])
rc = proc.wait()
try:
if rc != 0:
raise ProcessExecutionError(
args, rc, proc.stdout.read(), proc.stderr.read()
)
finally:
proc.stdin.close()
proc.stdout.close()
proc.stderr.close()
[docs] def session(self, isatty=False, new_session=False):
return ShellSession(
self.popen(
["/bin/sh"], (["-tt"] if isatty else ["-T"]), new_session=new_session
),
self.custom_encoding,
isatty,
self.connect_timeout,
host=self.host,
)
[docs] def tunnel(
self,
lport,
dport,
lhost="localhost",
dhost="localhost",
connect_timeout=5, # noqa: ARG002
reverse=False,
):
r"""Creates an SSH tunnel from the TCP port (``lport``) of the local machine
(``lhost``, defaults to ``"localhost"``, but it can be any IP you can ``bind()``)
to the remote TCP port (``dport``) of the destination machine (``dhost``, defaults
to ``"localhost"``, which means *this remote machine*). This function also
supports Unix sockets, in which case the local socket should be passed in as
``lport`` and the local bind address should be ``None``. The same can be done
for a remote socket, by following the same pattern with ``dport`` and ``dhost``.
The returned :class:`SshTunnel <plumbum.machines.remote.SshTunnel>` object can
be used as a *context-manager*.
The more conventional use case is the following::
+---------+ +---------+
| Your | | Remote |
| Machine | | Machine |
+----o----+ +---- ----+
| ^
| |
lport dport
| |
\______SSH TUNNEL____/
(secure)
Here, you wish to communicate safely between port ``lport`` of your machine and
port ``dport`` of the remote machine. Communication is tunneled over SSH, so the
connection is authenticated and encrypted.
The more general case is shown below (where ``dport != "localhost"``)::
+---------+ +-------------+ +-------------+
| Your | | Remote | | Destination |
| Machine | | Machine | | Machine |
+----o----+ +---- ----o---+ +---- --------+
| ^ | ^
| | | |
lhost:lport | | dhost:dport
| | | |
\_____SSH TUNNEL_____/ \_____SOCKET____/
(secure) (not secure)
Usage::
rem = SshMachine("megazord")
with rem.tunnel(1234, "/var/lib/mysql/mysql.sock", dhost=None):
sock = socket.socket()
sock.connect(("localhost", 1234))
# sock is now tunneled to the MySQL socket on megazord
"""
formatted_lhost = "" if lhost is None else f"[{lhost}]:"
formatted_dhost = "" if dhost is None else f"[{dhost}]:"
if str(lport) == "0":
lport = _get_free_port()
ssh_opts = (
[
"-L",
f"{formatted_lhost}{lport}:{formatted_dhost}{dport}",
]
if not reverse
else [
"-R",
f"{formatted_dhost}{dport}:{formatted_lhost}{lport}",
]
)
proc = self.popen((), ssh_opts=ssh_opts, new_session=True)
return SshTunnel(
ShellSession(
proc, self.custom_encoding, connect_timeout=self.connect_timeout
),
lport,
dport,
reverse,
)
@staticmethod
def _translate_drive_letter(path):
# replace c:\some\path with /c/some/path
path = str(path)
if ":" in path:
return "/" + path.replace(":", "").replace("\\", "/")
return path
[docs] def download(self, src, dst):
if isinstance(src, LocalPath):
raise TypeError(f"src of download cannot be {src!r}")
if isinstance(src, RemotePath) and src.remote != self:
raise TypeError(f"src {src!r} points to a different remote machine")
if isinstance(dst, RemotePath):
raise TypeError(f"dst of download cannot be {dst!r}")
if IS_WIN32:
src = self._translate_drive_letter(src)
dst = self._translate_drive_letter(dst)
self._scp_command(f"{self._fqhost}:{shquote(src)}", dst)
[docs] def upload(self, src, dst):
if isinstance(src, RemotePath):
raise TypeError(f"src of upload cannot be {src!r}")
if isinstance(dst, LocalPath):
raise TypeError(f"dst of upload cannot be {dst!r}")
if isinstance(dst, RemotePath) and dst.remote != self:
raise TypeError(f"dst {dst!r} points to a different remote machine")
if IS_WIN32:
src = self._translate_drive_letter(src)
dst = self._translate_drive_letter(dst)
self._scp_command(src, f"{self._fqhost}:{shquote(dst)}")
[docs]class PuttyMachine(SshMachine):
"""
PuTTY-flavored SSH connection. The programs ``plink`` and ``pscp`` are expected to
be in the path (or you may provide your own ``ssh_command`` and ``scp_command``)
Arguments are the same as for :class:`plumbum.machines.remote.SshMachine`
"""
[docs] def __init__(
self,
host,
user=None,
port=None,
keyfile=None,
ssh_command=None,
scp_command=None,
ssh_opts=(),
scp_opts=(),
encoding="utf8",
connect_timeout=10,
new_session=False,
):
if ssh_command is None:
ssh_command = local["plink"]
if scp_command is None:
scp_command = local["pscp"]
if not ssh_opts:
ssh_opts = ["-ssh"]
if user is None:
user = local.env.user
if port is not None:
ssh_opts.extend(["-P", str(port)])
scp_opts = [*list(scp_opts), "-P", str(port)]
port = None
SshMachine.__init__(
self,
host,
user,
port,
keyfile=keyfile,
ssh_command=ssh_command,
scp_command=scp_command,
ssh_opts=ssh_opts,
scp_opts=scp_opts,
encoding=encoding,
connect_timeout=connect_timeout,
new_session=new_session,
)
[docs] def __str__(self):
return f"putty-ssh://{self._fqhost}"
def _translate_drive_letter(self, path):
# pscp takes care of windows paths automatically
return path
[docs] def session(self, isatty=False, new_session=False):
return ShellSession(
self.popen((), (["-t"] if isatty else ["-T"]), new_session=new_session),
self.custom_encoding,
isatty,
self.connect_timeout,
)