|
Server : LiteSpeed System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64 User : elvh3918 ( 1528) PHP Version : 8.2.31 Disable Function : mail Directory : /proc/thread-self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/ |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
"""
This module contains X-Ray User Manager service implementation
"""
import collections
import json
import logging
import queue
import re
import socket
import struct
import subprocess
from threading import Lock, current_thread
from typing import Tuple
from xray.agent.executor import BoundedThreadExecutor
from xray.console_utils.run_user.runners import get_runner, Runner
from xray.internal.constants import user_agent_sock, user_agent_log
from xray.internal.exceptions import XRayError
from xray import gettext as _
from xray.internal.user_plugin_utils import (
unpack_request,
pack_response,
extract_creds,
check_for_root,
error_response
)
from xray.internal.utils import create_socket, read_sys_id, configure_logging
from clcommon.clwpos_lib import get_locale_from_envars
logger = logging.getLogger('user_agent')
_LOCALE_RE = re.compile(r'^[a-zA-Z]{1,5}(_[a-zA-Z]{2})?(\.[a-zA-Z0-9-]+)?$')
_MAX_WORKERS = 50
_MAX_CONNECTIONS_PER_UID = 25
_uid_connections = collections.defaultdict(int)
_uid_connections_lock = Lock()
_MAX_REQUEST_SIZE = 1048576
def general_exec_error() -> Tuple[bytes, bytes]:
"""
General format of message in case of errors during manager execution
"""
_err = _('X-Ray User Plugin failed to execute your request. Please, contact your server administrator')
return error_response(_err).encode(), b''
def log_truncate(orig_msg: bytes) -> str:
"""
Cut data field from the original message, because it could be huge
"""
return re.sub('(?<="data": {).+(?=}, "result")', '...', orig_msg.decode())
def duplicate_warning_cast(orig_msg: bytes) -> bytes:
"""
Extend warning 'Task is duplicated by URL' for end user
"""
def gen(m):
"""
Add more text for duplicate warning, leave others unchanged
"""
additional = b""". \
In case if you do not see running task for the same URL in your list of tasks below, \
contact your server administrator and ask him to check whether the requested URL \
is in the tasks list of X-Ray Admin Plugin or is scheduled for continuous tracing."""
warn = m.group(0)
if warn == b'Task is duplicated by URL':
return warn + additional
return warn
return re.sub(b'(?<="warning": ").+(?="})', gen, orig_msg)
def execute_manager(command: dict, user: str, runner: Runner) -> Tuple[bytes, bytes]:
"""
Trigger runner.target utility with requested parameters
"""
if runner.name == 'manager':
try:
command['system_id'] = read_sys_id()
except XRayError:
return general_exec_error()
def runner_cast_opt(opt):
"""runner may define a special cast for each option"""
return runner.option_cast[opt](
opt) if opt in runner.option_cast else opt
def hide_true_val(v):
"""Hide True value from cmd representation for bool-flag options"""
return '' if v is True else f'={v}'
api_version_key = 'api_version'
locale_option = 'lang'
# options that is not needed to be passed to utility
skip_options = [api_version_key, locale_option]
api_version = command.get(api_version_key)
# could be passed from WordPress Plugin; guard against None to avoid
# TypeError in subprocess.run() when LANG=None in the env dict.
target_locale = command.get(locale_option, get_locale_from_envars())
if not isinstance(target_locale, str) or not _LOCALE_RE.match(target_locale):
target_locale = get_locale_from_envars() or 'C.UTF-8'
if api_version:
casted_api_version = runner_cast_opt(api_version_key)
cmd = [f'/usr/sbin/{runner.target}', f'--{casted_api_version}', api_version, command.pop('command')]
else:
cmd = [f'/usr/sbin/{runner.target}', command.pop('command')]
# bool options are added only if they are True
cmd.extend([f'--{runner_cast_opt(k)}{hide_true_val(v)}' for k, v in
command.items() if v and k not in skip_options])
with_env = {'XRAYEXEC_UID': user, 'LANG': target_locale}
logger.info('Going to execute: %s, with environment: %s', cmd, str(with_env))
try:
p = subprocess.run(cmd, capture_output=True,
env=with_env, timeout=120)
except (OSError, ValueError, TypeError, subprocess.SubprocessError):
return general_exec_error()
byte_out, byte_err = p.stdout.strip(), p.stderr.strip()
if byte_out:
logger.info('[%s] Proxied command stdout: %s', current_thread().name,
log_truncate(byte_out))
if byte_err:
logger.info('[%s] Proxied command stderr: %s', current_thread().name,
byte_err.decode())
return duplicate_warning_cast(byte_out), byte_err
def handle(connection: 'socket object') -> None:
"""
Handle incoming connection
:param connection: socket object usable to
send and receive data on the connection
"""
root_error = json.dumps({
'result': _('Commands from root are not accepted')
}, ensure_ascii=False)
with connection:
connection.settimeout(5.0)
_pid, _uid, _gid = extract_creds(connection)
if check_for_root(_uid):
connection.sendall(pack_response(root_error.encode()))
return
with _uid_connections_lock:
if _uid_connections.get(_uid, 0) >= _MAX_CONNECTIONS_PER_UID:
logger.warning('Too many concurrent connections from uid=%s, rejecting', _uid)
return
_uid_connections[_uid] += 1
try:
raw_msglen = connection.recv(4)
if not raw_msglen or len(raw_msglen) < 4:
return
msglen = struct.unpack('>I', raw_msglen)[0]
if msglen == 0 or msglen > _MAX_REQUEST_SIZE:
return
data = b''
while len(data) < msglen:
chunk = connection.recv(msglen - len(data))
if not chunk:
return
data += chunk
try:
args = unpack_request(data)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
connection.sendall(pack_response(
json.dumps({'result': str(e)}, ensure_ascii=False).encode()))
return
try:
runner = get_runner(args.pop('runner'))
except XRayError as e:
connection.sendall(pack_response(str(e).encode()))
return
try:
runner.validator(args)
except SystemExit as e:
connection.sendall(pack_response(str(e).encode()))
return
_out, _err = execute_manager(args, str(_uid), runner)
connection.sendall(pack_response(_out or _err))
except (OSError, socket.timeout):
return
finally:
with _uid_connections_lock:
_uid_connections[_uid] -= 1
if _uid_connections[_uid] <= 0:
del _uid_connections[_uid]
def run() -> None:
"""
Run listening service
"""
configure_logging(user_agent_log)
with create_socket(user_agent_sock) as s, \
BoundedThreadExecutor(max_workers=_MAX_WORKERS, maxqueuesize=0) as pool:
while True:
conn, _ = s.accept()
try:
pool.submit(handle, conn)
except queue.Full:
logger.error('Connection rejected: thread pool is full')
conn.close()