|
Server : LiteSpeed System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64 User : elvh3918 ( 1528) PHP Version : 8.2.31 Disable Function : mail Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/ |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import grp
import json
import logging
import os
import pwd
import socket
import struct
from functools import wraps
from typing import Callable, Optional, Tuple, Any
from clcommon.cpapi import get_main_username_by_uid
from clcommon.lib.cledition import is_cl_solo_edition, is_cl_shared_pro_edition
from xray import gettext as _
from .constants import user_tasks_count, fpm_reload_timeout
from .exceptions import XRayError, XRayMissingDomain
from .fpm_utils import FPMReloadController
from .nginx_utils import NginxUserCache
# --------- GLOBALS ---------
logger = logging.getLogger('user_plugin_utils')
_format = '>I'
# --------- FUNCTIONS ---------
def pack_request(_input: Any) -> bytes:
"""
Pack input for sending with length-prefix framing
"""
data = json.dumps(_input).encode()
return struct.pack(_format, len(data)) + data
def unpack_request(byte_command: bytes) -> Any:
"""
Unpack incoming command
"""
_command = byte_command.decode()
logger.info('Command requested => %s', _command)
return json.loads(_command)
def pack_response(msg: bytes) -> bytes:
"""
Prefix message with a 4-byte length
"""
logger.debug('Packing message of %i length', len(msg))
return struct.pack(_format, len(msg)) + msg
def unpack_response(sock_object: 'socket object') -> bytes:
"""
Read length-prefixed amount of data from socket
"""
chunk = 4096
max_msglen = 10 * 1024 * 1024
msg = bytes()
raw_msglen = sock_object.recv(4)
if not raw_msglen:
return sock_object.recv(chunk)
msglen = struct.unpack(_format, raw_msglen)[0]
if msglen > max_msglen:
raise ConnectionError(
'Response message too large: %d bytes' % msglen)
while len(msg) < msglen:
part = sock_object.recv(chunk)
if not part:
raise ConnectionError(
'Connection closed before full message received')
msg += part
return msg
def extract_creds(sock_object: 'socket object') -> Tuple[Any, Any, Any]:
"""
Retrieve credentials from SO_PEERCRED option
"""
_format = '3i'
creds = sock_object.getsockopt(socket.SOL_SOCKET,
socket.SO_PEERCRED,
struct.calcsize(_format))
_pid, _uid, _gid = struct.unpack(_format, creds)
try:
user, group = pwd.getpwuid(_uid).pw_name, grp.getgrgid(_gid).gr_name
except KeyError:
logger.info('Connected by proc %i of %i:%i',
_pid, _uid, _gid)
else:
logger.info('Connected by proc %i of %i:%i (%s:%s)',
_pid, _uid, _gid,
user, group)
return _pid, _uid, _gid
def check_for_root(_uid: int = None) -> bool:
"""
Check for execution as root | command from root
"""
if _uid is None:
_uid = os.geteuid()
return _uid == 0
def get_xray_exec_user() -> Optional[str]:
"""
Retrieve the value of XRAYEXEC_UID env and resolve it to username
"""
proxyuid = os.getenv('XRAYEXEC_UID')
if proxyuid is not None:
_proxyuser = get_main_username_by_uid(int(proxyuid))
logger.info('Got XRAYEXEC_UID: %s (%s), working in USER_MODE',
proxyuid, _proxyuser)
return _proxyuser
def sock_receive(sock_object: 'socket object') -> bytes:
"""
Read all data from socket object
"""
data = b''
while True:
chunk = sock_object.recv(1024)
if not chunk:
logger.debug('All data read, connection ended')
break
data += chunk
return data
def error_response(msg: str) -> 'json str':
"""
Construct an appropriate formatted response in case of error
"""
return json.dumps({'result': msg}, ensure_ascii=False)
def nginx_user_cache() -> Optional[bool]:
"""
Check nginx cache status for current user
"""
proxyuser = get_xray_exec_user()
if proxyuser is not None:
return NginxUserCache(proxyuser).is_enabled
def root_execution_only_check() -> None:
"""
Check if utility is executed as root and throw error in case if no
"""
if not check_for_root():
raise SystemExit(
error_response(_('Only root is allowed to execute this utility')))
# --------- DECORATORS ---------
def user_mode_verification(func: Callable) -> Callable:
"""
Decorator aimed to verify domain owner in X-Ray Manager user mode
Applies to get_domain_info method
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""
Wraps func
"""
proxyuser = get_xray_exec_user()
if proxyuser is None:
return func(*args, **kwargs)
try:
info = func(*args, **kwargs)
except XRayMissingDomain as e:
# Convert to a generic XRayError so callers cannot distinguish
# "domain does not exist" from "domain belongs to another user"
# (information-oracle defense). The `from None` suppresses the
# original exception context so traceback printing also keeps
# the two cases indistinguishable.
#
# Internal-only marker: lets the original caller (e.g.
# base.py:stop()) re-detect the "missing domain" case to
# preserve graceful cleanup of orphaned task data / cron jobs
# for an already-deleted domain. The marker is an INSTANCE
# attribute on a plain XRayError; it is NOT serialized by
# XRayError.__str__ (which only emits self.reason and
# self.context), so it is not observable to user-mode
# callers via the user_agent socket. Subclassing would
# have changed type(err) and broken the uniform-class
# invariant — see tests in user_mode_verification_tests.py.
err = XRayError(
_('%s cannot be found') % f"Domain {e.domain_name}"
)
err._internal_missing_domain = True
raise err from None
if info.user != proxyuser:
logger.warning('%s does not belong to user %s', info, proxyuser)
# No marker here — the "not your domain" path must not be
# able to trigger the graceful cleanup branch in callers.
raise XRayError(_('%s cannot be found') % str(info))
return info
return wrapper
def user_mode_restricted(func: Callable) -> Callable:
"""
Decorator aimed to check if user is not hitting limit of running tasks,
set in X-Ray Manager user mode.
Applies to start and continue methods.
Limiting of user's running tasks is applied to Shared PRO only.
"""
def check(*args):
"""
If XRAYEXEC_UID exists, check if user does not exceed
limit of running tasks
"""
# TODO: [unification] ensure is_cl_shared_pro_edition really needed here
# https://cloudlinux.atlassian.net/browse/XRAY-244 - (seems yes)
if not is_cl_shared_pro_edition(skip_jwt_check=True):
return
proxyuser = get_xray_exec_user()
if proxyuser is not None:
ui_api_cli_instanse = args[0].ui_api_client
resp = ui_api_cli_instanse.get_task_list()
list_of_tasks = resp.get('result')
if list_of_tasks is not None:
running_count = len([item for item in list_of_tasks if
item.get('status') == 'running'])
if running_count >= user_tasks_count:
raise XRayError(
_('Limit of running tasks is {}. '
'You already have {} running task'.format(str(user_tasks_count),
str(user_tasks_count))))
@wraps(func)
def wrapper(*args, **kwargs):
"""
Wraps func
"""
check(*args)
return func(*args, **kwargs)
return wrapper
def with_fpm_reload_restricted(func: Callable) -> Callable:
"""
Decorator aimed to restrict frequent reloads of FPM service
Applies to get_domain_info method.
Decorator ordering invariant: this decorator MUST sit OUTSIDE
user_mode_verification (i.e. ``@with_fpm_reload_restricted`` above
``@user_mode_verification``) on every get_domain_info implementation.
The wrapper below runs ``check`` only AFTER ``func`` returns, so any
ownership-failure XRayError raised by the inner user_mode_verification
wrapper short-circuits the throttle probe. Reversing the order would
let the throttle check inspect another user's FPM service name and
raise a distinguishable XRayError(flag='warning') before the outer
ownership check could uniformise the response, leaking a side channel
on whether the queried domain exists and whose service is currently
throttled (see XRAY-244 / CLPRO-3079 information-oracle defense).
"""
def check(*args, data):
"""
"""
# TODO: [unification] ensure is_cl_solo_edition really needed here
# https://cloudlinux.atlassian.net/browse/XRAY-244 (seems yes)
if is_cl_solo_edition(skip_jwt_check=True):
return
proxyuser = get_xray_exec_user()
if proxyuser is not None and data.panel_fpm:
_fpm_service = args[0].fpm_service_name(data)
if FPMReloadController(_fpm_service).restrict():
raise XRayError(
_('The X-Ray User service is currently busy. Operation is temporarily not permitted. '
'Try again in %s minute') % str(fpm_reload_timeout),
flag='warning')
@wraps(func)
def wrapper(*args, **kwargs):
"""
Wraps func
"""
info = func(*args, **kwargs)
check(*args, data=info)
return info
return wrapper
def username_verification(func: Callable) -> Callable:
def validate(username: str):
"""
If exists, check XRAYEXEC_UID against user passed param
"""
proxyuser = get_xray_exec_user()
if proxyuser is not None and username != proxyuser:
raise XRayError(_('Incorrect user for request'))
@wraps(func)
def wrapper(*args, **kwargs):
username = kwargs['username']
validate(username)
return func(*args, **kwargs)
return wrapper
def user_mode_advice_verification(func: Callable) -> Callable:
"""
Decorator aimed to verify user in X-Ray Smart Advice user mode
Applies to get_detailed_advice method, which takes part in
advice_details and advice_apply methods
"""
def verify(data: dict) -> None:
"""
If exists, check XRAYEXEC_UID against user in metadata of an advice
"""
proxyuser = get_xray_exec_user()
try:
username = data['metadata']['username']
except KeyError:
raise XRayError(_('Requested advice cannot be verified'))
if proxyuser is not None and username != proxyuser:
raise XRayError(_('Requested advice does not exist'))
@wraps(func)
def wrapper(*args, **kwargs):
"""
Wraps func
"""
advice_info, _ = func(*args, **kwargs)
verify(advice_info)
return advice_info, _
return wrapper