"""Asyncio support for Plumbum commands.
This module provides async versions of Plumbum commands that can be used with
Python's asyncio framework. Commands can be awaited directly or used with
async context managers.
For async machines (AsyncLocalMachine, AsyncSshMachine), see plumbum.machines.local
and plumbum.machines.ssh_machine respectively
Design Philosophy
-----------------
This implementation uses **delegation over inheritance** to wrap existing sync
commands rather than reimplementing their functionality. This approach:
- Maximizes code reuse (~100 lines of logic delegated to sync commands)
- Ensures consistency between sync and async APIs
- Reduces maintenance burden (changes to sync code automatically apply)
- Enables automatic support for features like `with_env()` and `with_cwd()`
Why Delegation Instead of Inheritance?
---------------------------------------
Sync and async methods are fundamentally incompatible in Python:
- Sync methods return values directly: `def run() -> tuple[int, str, str]`
- Async methods return coroutines: `async def run() -> AsyncResult`
You cannot override a sync method with an async one in the same class hierarchy.
Therefore, we use separate classes that wrap and delegate to sync commands,
reusing all their formulation, binding, and pipeline logic.
Example Usage
-------------
::
from plumbum import async_local
async def main():
# Simple command execution
result = await async_local["ls"]("-la")
print(result)
# With explicit run method
ls = async_local["ls"]
result = await ls.run(["-la"])
print(result.stdout)
# Pipeline support
result = await (async_local["ls"] | async_local["grep"]["py"])()
print(result)
.. versionadded:: 2.0
"""
from __future__ import annotations
__lazy_modules__ = {
"asyncio",
"contextlib",
"plumbum.commands.processes",
"plumbum.machines.local",
"typing_extensions",
}
import asyncio
import contextlib
import os
import sys
from typing import TYPE_CHECKING, Any, TextIO
from plumbum.commands.base import BoundCommand, BoundEnvCommand, Pipeline
from plumbum.commands.processes import ProcessExecutionError, ProcessTimedOut
from plumbum.machines.local import local
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
if TYPE_CHECKING:
from collections.abc import Container, Coroutine, Sequence
from plumbum.commands.base import BaseCommand
[docs]
class AsyncResult:
"""Result of an async command execution.
Attributes:
returncode: The exit code of the process
stdout: Standard output as a string
stderr: Standard error as a string
"""
__slots__ = ("returncode", "stderr", "stdout")
[docs]
def __init__(self, returncode: int, stdout: str, stderr: str):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
[docs]
def __str__(self) -> str:
return self.stdout
[docs]
def __repr__(self) -> str:
return f"AsyncResult(returncode={self.returncode}, stdout={self.stdout!r}, stderr={self.stderr!r})"
[docs]
class AsyncPipelineProcess:
"""Proxy around the downstream process of an async pipeline.
Output attributes (``stdout``, ``stderr``, ``pid``, ...) are delegated to the
downstream :class:`asyncio.subprocess.Process`, while ``stdin`` is taken from
the *upstream* process so writes feed the head of the pipeline (mirroring
:meth:`plumbum.commands.base.Pipeline.popen`, which sets
``dstproc.stdin = srcproc.stdin``). :meth:`wait` and :meth:`communicate` reap
both stages and report a combined return code -- the downstream code, or the
upstream code if the downstream stage succeeded. :meth:`kill`,
:meth:`terminate` and :meth:`send_signal` are propagated to every stage
(recursing through a nested upstream pipeline) rather than only the last one.
A separate proxy is needed (rather than patching ``wait`` on the downstream
process) because :attr:`asyncio.subprocess.Process.returncode` is a read-only
property and cannot be reassigned to the combined value.
Instances are returned by :meth:`AsyncCommandMixin.popen` for pipelines; you
don't normally construct them directly.
.. versionadded:: 1.11
"""
[docs]
def __init__(
self,
dstproc: asyncio.subprocess.Process | AsyncPipelineProcess,
srcproc: asyncio.subprocess.Process | AsyncPipelineProcess,
) -> None:
# ``dstproc`` is always a real Process (a pipeline's last stage is a
# single command); ``srcproc`` may be another proxy for nested pipelines.
self._dstproc = dstproc
self.srcproc = srcproc
self._returncode: int | None = None
def __getattr__(self, name: str) -> Any:
# Only called when `name` isn't a real attribute on the proxy, so this
# delegates the rest of the Process interface to the downstream process.
return getattr(self._dstproc, name)
@property
def stdin(self) -> asyncio.StreamWriter | None:
# The downstream stage reads from the OS pipe, so its own stdin is None;
# the pipeline's stdin is the head (upstream) process's stdin.
return self.srcproc.stdin
@property
def returncode(self) -> int | None:
if self._returncode is not None:
return self._returncode
return self._dstproc.returncode
def _combine(self, rc_dst: int | None, rc_src: int | None) -> int:
self._returncode = (rc_dst or rc_src) or 0
return self._returncode
def _signal_both(self, method: str) -> None:
# Signal both ends; for a nested upstream pipeline ``srcproc`` is itself
# an ``AsyncPipelineProcess``, so this recurses through every stage.
# Each stage may already have exited, so ignore "no such process".
for proc in (self._dstproc, self.srcproc):
with contextlib.suppress(ProcessLookupError):
getattr(proc, method)()
def kill(self) -> None:
self._signal_both("kill")
def terminate(self) -> None:
self._signal_both("terminate")
def send_signal(self, signal: int) -> None:
for proc in (self._dstproc, self.srcproc):
with contextlib.suppress(ProcessLookupError):
proc.send_signal(signal)
async def wait(self) -> int:
rc_dst = await self._dstproc.wait()
rc_src = await self.srcproc.wait()
return self._combine(rc_dst, rc_src)
async def _reap(self) -> tuple[bytes | None, bytes | None]:
# Reap the whole pipeline, draining every stage via communicate() so a
# full stderr pipe can't deadlock the wait. The last stage's stdout is
# its captured output; an upstream stage's stdout is an OS pipe (no
# StreamReader), so communicate() there just drains its stderr. Does not
# feed stdin -- the head's stdin is fed by communicate() below.
src = self.srcproc
reap_src = (
src._reap() if isinstance(src, AsyncPipelineProcess) else src.communicate()
)
(stdout, stderr), _ = await asyncio.gather(
self._dstproc.communicate(), reap_src
)
self._combine(self._dstproc.returncode, src.returncode)
return stdout, stderr
async def communicate(
self, input: bytes | None = None
) -> tuple[bytes | None, bytes | None]:
# Feed the pipeline's stdin (the head stage) concurrently with reaping
# every stage -- draining each one's stderr, and the last stage's
# stdout/stderr, so a full pipe on any stage can't deadlock the wait.
async def feed() -> None:
writer = self.stdin
if writer is None:
return
# As in asyncio's own Process.communicate(), a stage that exits
# early closes the pipe, so ignore the resulting write errors.
with contextlib.suppress(BrokenPipeError, ConnectionResetError):
try:
if input:
writer.write(input)
await writer.drain()
finally:
writer.close()
await writer.wait_closed()
_, (stdout, stderr) = await asyncio.gather(feed(), self._reap())
return stdout, stderr
class AsyncCommandMixin:
"""Mixin that adds async execution capabilities to BaseCommand.
This mixin wraps a sync BaseCommand and provides async execution methods
while reusing all the existing formulation, binding, and pipeline logic.
The delegation pattern allows us to:
- Reuse BaseCommand.formulate() for command-to-argv conversion
- Reuse BaseCommand.__getitem__() for argument binding
- Reuse BaseCommand.__or__() for pipeline creation
- Reuse BoundEnvCommand for environment and cwd handling
- Maintain consistency with the sync API
"""
__slots__ = ("_base_cmd",)
_base_cmd: BaseCommand
def __init__(self, base_cmd: BaseCommand):
"""Initialize with a sync BaseCommand to wrap.
Args:
base_cmd: The sync command to wrap and delegate to
"""
self._base_cmd = base_cmd
@property
def _concrete(self) -> BaseCommand:
"""The innermost command, unwrapping any ``Bound``/``BoundEnv`` layers.
Binding arguments or an environment wraps the base command, so the
concrete command (the one carrying ``executable``/``remote``) lives
underneath. This is used by the subclass ``executable``/``remote``
properties so they keep working after ``[...]``/``with_env``/``with_cwd``.
"""
cmd = self._base_cmd
while isinstance(cmd, (BoundCommand, BoundEnvCommand)):
cmd = cmd.cmd
return cmd
def __getitem__(self, args: Any) -> Self:
"""Bind arguments using the base command's logic.
This delegates to the sync command's __getitem__ method, which handles
all the argument binding logic, then re-wraps the result in the same
async type as ``self`` (preserving e.g. ``AsyncLocalCommand``).
"""
bound = self._base_cmd[args]
return self.__class__(bound)
def with_env(self, **env: str) -> Self:
"""Return a new async command with the given environment variables.
Delegates to the sync command's ``with_env`` (which produces a
``BoundEnvCommand``) and re-wraps it in the same async type.
"""
return self.__class__(self._base_cmd.with_env(**env))
def with_cwd(self, path: Any) -> Self:
"""Return a new async command with the given working directory.
Delegates to the sync command's ``with_cwd`` and re-wraps the result.
"""
return self.__class__(self._base_cmd.with_cwd(path))
def __call__(self, *args: Any, **kwargs: Any) -> Coroutine[Any, Any, str]:
"""Execute the command asynchronously and return stdout.
This is a shortcut for run() that returns only stdout, matching the
behavior of the sync API's __call__ method.
"""
async def _run() -> str:
result = await self.run(args, **kwargs)
return result.stdout
return _run()
def __or__(self, other: AsyncCommandMixin) -> Self:
"""Create a pipeline using the base command's logic.
This delegates to the sync command's __or__ method to create a sync
Pipeline, then wraps it with the same type as self.
"""
sync_pipeline = self._base_cmd | other._base_cmd
return self.__class__(sync_pipeline)
def formulate(self, level: int = 0, args: Sequence[Any] = ()) -> list[str]:
"""Delegate formulation to the base command.
This reuses the sync command's formulation logic, which handles:
- Converting the command to an argv list
- Proper shell quoting based on nesting level
- Handling of bound arguments
- Support for nested commands
"""
return self._base_cmd.formulate(level, args)
async def run(
self,
args: Sequence[Any] = (),
retcode: int | Container[int] | None = 0,
timeout: float | None = None,
cwd: str | None = None,
env: dict[str, str] | None = None,
) -> AsyncResult:
"""Run the command asynchronously.
Args:
args: Additional arguments to pass to the command
retcode: Expected return code(s). None to disable checking.
timeout: Maximum time to wait for command completion
cwd: Working directory for the command
env: Environment variables for the command
Returns:
AsyncResult with returncode, stdout, and stderr
Raises:
ProcessExecutionError: If return code doesn't match expected
asyncio.TimeoutError: If timeout is exceeded
"""
loop = asyncio.get_running_loop()
def _run_sync() -> tuple[int, str, str]:
retcode_val, stdout, stderr = self._base_cmd.run(
args, retcode=retcode, timeout=timeout, cwd=cwd, env=env
)
return retcode_val, stdout, stderr
try:
retcode_val, stdout, stderr = await loop.run_in_executor(None, _run_sync)
except ProcessTimedOut as e:
raise asyncio.TimeoutError() from e
return AsyncResult(retcode_val, stdout, stderr)
async def popen(
self,
args: Sequence[Any] = (),
cwd: str | None = None,
env: dict[str, str] | None = None,
) -> asyncio.subprocess.Process | AsyncPipelineProcess:
"""Create an async subprocess without waiting for it to complete.
This is useful for long-running processes or when you need to
interact with stdin/stdout/stderr.
.. note::
Streaming is only supported for **local** commands. Remote commands
raise :class:`NotImplementedError` here -- run them with ``.run()``
or by calling the command directly (those execute the underlying
sync command in a thread).
Args:
args: Additional arguments to pass to the command
cwd: Working directory for the command
env: Environment variables for the command
Returns:
An :class:`asyncio.subprocess.Process` for a plain command. For a
pipeline, an :class:`AsyncPipelineProcess` proxy that exposes the same
interface (``stdout``/``stderr`` from the last stage, ``stdin`` to
the first) while reaping every stage on ``wait``/``communicate``.
"""
return await self._popen(args, cwd=cwd, env=env)
async def _popen(
self,
args: Sequence[Any] = (),
*,
cwd: str | None = None,
env: dict[str, str] | None = None,
stdin: int | None = asyncio.subprocess.PIPE,
stdout: int | None = asyncio.subprocess.PIPE,
stderr: int | None = asyncio.subprocess.PIPE,
) -> asyncio.subprocess.Process | AsyncPipelineProcess:
"""Spawn the subprocess, threading stdin/stdout through pipeline stages.
The public ``popen`` always uses pipes for all three streams, but
pipelines need to connect one stage's stdout to the next stage's stdin,
so this internal helper exposes those handles.
"""
# Streaming via ``asyncio.create_subprocess_exec`` only works for local
# commands -- a RemoteCommand would be formulated without its ssh wrapper
# and silently run on *this* machine. Remote commands are still supported
# through ``run()``/``__call__`` (which delegate to the sync command in a
# thread); fail loudly here rather than executing the wrong thing.
if self._base_cmd.machine is not local:
raise NotImplementedError(
"Async popen/streaming (popen, AsyncTEE) is only supported for "
"local commands. Use .run() or call the command directly for "
"remote commands -- those execute the sync command in a thread."
)
# Binding args/env/cwd wraps the command in BoundCommand/BoundEnvCommand
# (possibly around a Pipeline -- e.g. (a | b)["--flag"]). Unwrap those to
# reach the concrete command (or Pipeline) and thread the bound
# args/env/cwd inward, so they take effect for plain commands too.
base: BaseCommand = self._base_cmd
cmd_args = list(args)
cmd_env = env
cmd_cwd = cwd
while isinstance(base, (BoundCommand, BoundEnvCommand)):
if isinstance(base, BoundCommand):
cmd_args = [*base.args, *cmd_args]
else:
cmd_env = {**base.env, **(cmd_env or {})}
cmd_cwd = base.cwd if cmd_cwd is None else cmd_cwd
base = base.cmd
# A pipeline has no single argv; connect the two stages with an OS pipe,
# mirroring the synchronous Pipeline.popen (extra args go to the head
# stage, as in that implementation).
if isinstance(base, Pipeline):
read_fd, write_fd = os.pipe()
# The parent closes each fd once the child has inherited it. The
# nested try ensures both fds are closed even if either spawn raises
# (write_fd by the inner finally, read_fd by the outer one).
try:
try:
srcproc = await self.__class__(base.srccmd)._popen(
cmd_args,
cwd=cmd_cwd,
env=cmd_env,
stdin=stdin,
stdout=write_fd,
stderr=stderr,
)
finally:
os.close(write_fd)
try:
dstproc = await self.__class__(base.dstcmd)._popen(
cwd=cmd_cwd,
env=cmd_env,
stdin=read_fd,
stdout=stdout,
stderr=stderr,
)
except BaseException:
# The upstream stage is already running; reap it rather than
# leaking a child process and its open pipe ends.
with contextlib.suppress(ProcessLookupError):
srcproc.kill()
await srcproc.wait()
raise
finally:
os.close(read_fd)
# Waiting on the pipeline reaps the upstream process too, and the
# return code is the downstream stage's, or the upstream stage's if
# the downstream one succeeded (a pipefail-like combination, as in
# the synchronous Pipeline.popen).
return AsyncPipelineProcess(dstproc, srcproc)
# Formulate at level 0 (no shell-quoting -- we exec the argv directly),
# matching the synchronous LocalCommand.popen.
argv = base.formulate(0, cmd_args)
full_env = dict(local.env.getdict())
if cmd_env:
full_env.update(cmd_env)
working_dir = cmd_cwd or str(local.cwd)
return await asyncio.create_subprocess_exec(
*argv,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=working_dir,
env=full_env,
)
[docs]
class AsyncCommand(AsyncCommandMixin):
"""Async wrapper for BaseCommand.
This class wraps any BaseCommand and provides async execution capabilities.
It reuses all the formulation, binding, and pipeline logic from the base command.
Example::
# The sync command is looked up and wrapped
async_cmd = async_local["ls"]
# Binding works via delegation to sync command
bound_cmd = async_cmd["-la"]
# Execution is async
result = await bound_cmd.run()
"""
__slots__ = ()
[docs]
class AsyncLocalCommand(AsyncCommand):
"""Async version of LocalCommand.
This class wraps a LocalCommand and provides async execution methods.
It reuses all the LocalCommand logic for formulation, binding, etc.
"""
__slots__ = ()
@property
def executable(self) -> Any:
"""The path to the executable."""
# Unwrap any bound layers; the concrete LocalCommand carries `executable`.
return self._concrete.executable # type: ignore[attr-defined]
[docs]
class AsyncRemoteCommand(AsyncCommand):
"""Async wrapper for RemoteCommand.
This class wraps a RemoteCommand and provides async execution capabilities.
It reuses all the RemoteCommand logic for formulation, binding, etc.
Example::
async with AsyncSshMachine("host") as rem:
ls = rem["ls"]
result = await ls("-la")
.. versionadded:: 2.0
"""
__slots__ = ()
@property
def remote(self) -> Any:
"""The remote machine this command belongs to."""
# Unwrap any bound layers; the concrete RemoteCommand carries `remote`.
return self._concrete.remote # type: ignore[attr-defined]
# ===================================================================================================
# Async execution modifiers
# ===================================================================================================
class AsyncExecutionModifier:
"""Base class for async execution modifiers."""
__slots__ = ("__weakref__",)
def __repr__(self) -> str:
"""Automatically creates a representation for given subclass with slots."""
slots = {}
for cls in self.__class__.__mro__:
slots_list = getattr(cls, "__slots__", ())
if isinstance(slots_list, str):
slots_list = (slots_list,)
for prop in slots_list:
if prop[0] != "_":
slots[prop] = getattr(self, prop)
mystrs = (f"{name} = {value}" for name, value in slots.items())
mystrs_str = ", ".join(mystrs)
return f"{self.__class__.__name__}({mystrs_str})"
@classmethod
def __call__(cls, *args: Any, **kwargs: Any) -> Self:
return cls(*args, **kwargs)
class _AsyncTF(AsyncExecutionModifier):
"""Async execution modifier that returns True/False based on return code.
This is the async equivalent of the sync TF modifier. It runs the command
and returns True if the exit code matches the expected value, False otherwise.
Unlike the sync version, there is no FG parameter because async commands
don't have a concept of foreground/background execution - they're all
non-blocking by nature.
Example::
# Check if a file exists
exists = await (async_local["test"]["-f", "file.txt"] & AsyncTF)
# Check for specific exit code
result = await (async_local["grep"]["pattern", "file.txt"] & AsyncTF(retcode=(0, 1)))
.. versionadded:: 2.0
"""
__slots__ = ("retcode", "timeout")
def __init__(
self,
retcode: int | Container[int] = 0,
timeout: float | None = None,
) -> None:
"""Initialize AsyncTF modifier.
Args:
retcode: Expected return code(s). Default is 0.
timeout: Maximum time to wait for command completion.
"""
self.retcode = retcode
self.timeout = timeout
async def __rand__(self, cmd: AsyncCommandMixin) -> bool:
"""Execute command and return True/False based on return code."""
try:
await cmd.run(retcode=self.retcode, timeout=self.timeout)
except ProcessExecutionError:
return False
return True
class _AsyncRETCODE(AsyncExecutionModifier):
"""Async execution modifier that returns only the exit code.
This is the async equivalent of the sync RETCODE modifier. It runs the
command and returns only the exit code, ignoring stdout/stderr.
Unlike the sync version, there is no FG parameter because async commands
don't have a concept of foreground/background execution.
Example::
# Get exit code
code = await (async_local["ls"]["/nonexistent"] & AsyncRETCODE)
print(f"Exit code: {code}")
.. versionadded:: 2.0
"""
__slots__ = ("timeout",)
def __init__(self, timeout: float | None = None) -> None:
"""Initialize AsyncRETCODE modifier.
Args:
timeout: Maximum time to wait for command completion.
"""
self.timeout = timeout
async def __rand__(self, cmd: AsyncCommandMixin) -> int:
"""Execute command and return exit code."""
result = await cmd.run(retcode=None, timeout=self.timeout)
return result.returncode
class _AsyncTEE(AsyncExecutionModifier):
"""Async execution modifier that displays output in real-time and returns it.
This is the async equivalent of the sync TEE modifier. It runs the command,
displays stdout/stderr to the console in real-time, and also returns them.
Unlike the sync version, buffering is always enabled because async I/O
handles buffering differently.
Example::
# Run command and see output in real-time
retcode, stdout, stderr = await (async_local["npm"]["install"] & AsyncTEE)
# With custom expected return code
retcode, stdout, stderr = await (async_local["grep"]["pattern"] & AsyncTEE(retcode=(0, 1)))
.. versionadded:: 2.0
"""
__slots__ = ("retcode", "timeout")
def __init__(
self,
retcode: int | Container[int] = 0,
timeout: float | None = None,
) -> None:
"""Initialize AsyncTEE modifier.
Args:
retcode: Expected return code(s). Default is 0.
timeout: Maximum time to wait for command completion.
"""
self.retcode = retcode
self.timeout = timeout
async def __rand__(self, cmd: AsyncCommandMixin) -> tuple[int, str, str]:
"""Execute command, display output, and return (retcode, stdout, stderr)."""
encoding = cmd._base_cmd._get_encoding() or local.custom_encoding
# Reuse ``_popen`` so this works for pipelines too (its own
# create_subprocess_exec path only handled single commands), and so the
# local-only guard applies. stdin is inherited from the parent process.
proc = await cmd._popen((), stdin=None)
# Collect output while displaying it
stdout_lines: list[str] = []
stderr_lines: list[str] = []
async def read_stream(
stream: asyncio.StreamReader | None, output_list: list[str], target: TextIO
) -> None:
"""Read from stream line by line, display, and collect output."""
if stream is None:
return
while True:
line = await stream.readline()
if not line:
break
text = line.decode(encoding, errors="ignore")
output_list.append(text)
target.write(text)
target.flush()
async def drain_stream(
stream: asyncio.StreamReader | None, target: TextIO
) -> None:
"""Display a stream in fixed-size chunks (no line buffering).
Used for upstream pipeline stderr, where output may be large and
without newlines -- ``readline`` would raise once a line exceeds the
stream's buffer limit, so read by chunk instead.
"""
if stream is None:
return
while True:
chunk = await stream.read(4096)
if not chunk:
break
target.write(chunk.decode(encoding, errors="ignore"))
target.flush()
# ``proc.stdout``/``proc.stderr`` are the pipeline's *final* stage. Each
# upstream stage of a pipeline has its own stderr pipe that must also be
# drained -- otherwise a chatty upstream stderr fills its buffer and
# deadlocks the wait() below (only communicate() drains every stage). We
# display these too, like a shell pipeline; the returned stderr stays the
# final stage's, matching ``popen().communicate()``. The loop adds nothing
# for a plain command (no upstream stages).
readers = [
read_stream(proc.stdout, stdout_lines, sys.stdout),
read_stream(proc.stderr, stderr_lines, sys.stderr),
]
node: Any = proc
while isinstance(node, AsyncPipelineProcess):
node = node.srcproc
readers.append(drain_stream(node.stderr, sys.stderr))
# Read every stage's streams concurrently
try:
await asyncio.wait_for(
asyncio.gather(*readers),
timeout=self.timeout,
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise
# Wait for process to complete (reaps every pipeline stage)
await proc.wait()
# Combine output
stdout = "".join(stdout_lines)
stderr = "".join(stderr_lines)
retcode = proc.returncode
# Check return code
if self.retcode is not None:
expected_codes: set[int] = (
{self.retcode} if isinstance(self.retcode, int) else set(self.retcode) # type: ignore[call-overload]
)
if retcode not in expected_codes:
raise ProcessExecutionError(
argv=cmd.formulate(0, ()),
retcode=retcode,
stdout=stdout,
stderr=stderr,
)
return (retcode or 0), stdout, stderr
# Singleton instances
AsyncTF = _AsyncTF()
AsyncRETCODE = _AsyncRETCODE()
AsyncTEE = _AsyncTEE()
__all__ = (
"AsyncCommand",
"AsyncLocalCommand",
"AsyncPipelineProcess",
"AsyncRETCODE",
"AsyncRemoteCommand",
"AsyncResult",
"AsyncTEE",
"AsyncTF",
)
def __dir__() -> list[str]:
return list(__all__)