Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/user_plugin_utils.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import grp
import json
import logging
import os
import pwd
import socket
import struct
from functools import wraps
from typing import Callable, Optional, Tuple, Any

from clcommon.cpapi import get_main_username_by_uid
from clcommon.lib.cledition import is_cl_solo_edition, is_cl_shared_pro_edition

from xray import gettext as _
from .constants import user_tasks_count, fpm_reload_timeout
from .exceptions import XRayError, XRayMissingDomain
from .fpm_utils import FPMReloadController
from .nginx_utils import NginxUserCache

# --------- GLOBALS ---------

logger = logging.getLogger('user_plugin_utils')
_format = '>I'


# --------- FUNCTIONS ---------


def pack_request(_input: Any) -> bytes:
    """
    Pack input for sending with length-prefix framing
    """
    data = json.dumps(_input).encode()
    return struct.pack(_format, len(data)) + data


def unpack_request(byte_command: bytes) -> Any:
    """
    Unpack incoming command
    """
    _command = byte_command.decode()
    logger.info('Command requested => %s', _command)
    return json.loads(_command)


def pack_response(msg: bytes) -> bytes:
    """
    Prefix message with a 4-byte length
    """
    logger.debug('Packing message of %i length', len(msg))
    return struct.pack(_format, len(msg)) + msg


def unpack_response(sock_object: 'socket object') -> bytes:
    """
    Read length-prefixed amount of data from socket
    """
    chunk = 4096
    max_msglen = 10 * 1024 * 1024
    msg = bytes()
    raw_msglen = sock_object.recv(4)
    if not raw_msglen:
        return sock_object.recv(chunk)
    msglen = struct.unpack(_format, raw_msglen)[0]

    if msglen > max_msglen:
        raise ConnectionError(
            'Response message too large: %d bytes' % msglen)

    while len(msg) < msglen:
        part = sock_object.recv(chunk)
        if not part:
            raise ConnectionError(
                'Connection closed before full message received')
        msg += part

    return msg


def extract_creds(sock_object: 'socket object') -> Tuple[Any, Any, Any]:
    """
    Retrieve credentials from SO_PEERCRED option
    """
    _format = '3i'
    creds = sock_object.getsockopt(socket.SOL_SOCKET,
                                   socket.SO_PEERCRED,
                                   struct.calcsize(_format))
    _pid, _uid, _gid = struct.unpack(_format, creds)
    try:
        user, group = pwd.getpwuid(_uid).pw_name, grp.getgrgid(_gid).gr_name
    except KeyError:
        logger.info('Connected by proc %i of %i:%i',
        _pid, _uid, _gid)
    else:
        logger.info('Connected by proc %i of %i:%i (%s:%s)',
                    _pid, _uid, _gid,
                    user, group)
    return _pid, _uid, _gid


def check_for_root(_uid: int = None) -> bool:
    """
    Check for execution as root | command from root
    """
    if _uid is None:
        _uid = os.geteuid()
    return _uid == 0


def get_xray_exec_user() -> Optional[str]:
    """
    Retrieve the value of XRAYEXEC_UID env and resolve it to username
    """
    proxyuid = os.getenv('XRAYEXEC_UID')
    if proxyuid is not None:
        _proxyuser = get_main_username_by_uid(int(proxyuid))
        logger.info('Got XRAYEXEC_UID: %s (%s), working in USER_MODE',
                    proxyuid, _proxyuser)
        return _proxyuser


def sock_receive(sock_object: 'socket object') -> bytes:
    """
    Read all data from socket object
    """
    data = b''
    while True:
        chunk = sock_object.recv(1024)
        if not chunk:
            logger.debug('All data read, connection ended')
            break
        data += chunk
    return data


def error_response(msg: str) -> 'json str':
    """
    Construct an appropriate formatted response in case of error
    """
    return json.dumps({'result': msg}, ensure_ascii=False)


def nginx_user_cache() -> Optional[bool]:
    """
    Check nginx cache status for current user
    """
    proxyuser = get_xray_exec_user()
    if proxyuser is not None:
        return NginxUserCache(proxyuser).is_enabled


def root_execution_only_check() -> None:
    """
    Check if utility is executed as root and throw error in case if no
    """
    if not check_for_root():
        raise SystemExit(
            error_response(_('Only root is allowed to execute this utility')))


# --------- DECORATORS ---------


def user_mode_verification(func: Callable) -> Callable:
    """
    Decorator aimed to verify domain owner in X-Ray Manager user mode
    Applies to get_domain_info method
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        proxyuser = get_xray_exec_user()
        if proxyuser is None:
            return func(*args, **kwargs)
        try:
            info = func(*args, **kwargs)
        except XRayMissingDomain as e:
            # Convert to a generic XRayError so callers cannot distinguish
            # "domain does not exist" from "domain belongs to another user"
            # (information-oracle defense). The `from None` suppresses the
            # original exception context so traceback printing also keeps
            # the two cases indistinguishable.
            #
            # Internal-only marker: lets the original caller (e.g.
            # base.py:stop()) re-detect the "missing domain" case to
            # preserve graceful cleanup of orphaned task data / cron jobs
            # for an already-deleted domain. The marker is an INSTANCE
            # attribute on a plain XRayError; it is NOT serialized by
            # XRayError.__str__ (which only emits self.reason and
            # self.context), so it is not observable to user-mode
            # callers via the user_agent socket. Subclassing would
            # have changed type(err) and broken the uniform-class
            # invariant — see tests in user_mode_verification_tests.py.
            err = XRayError(
                _('%s cannot be found') % f"Domain {e.domain_name}"
            )
            err._internal_missing_domain = True
            raise err from None
        if info.user != proxyuser:
            logger.warning('%s does not belong to user %s', info, proxyuser)
            # No marker here — the "not your domain" path must not be
            # able to trigger the graceful cleanup branch in callers.
            raise XRayError(_('%s cannot be found') % str(info))
        return info

    return wrapper


def user_mode_restricted(func: Callable) -> Callable:
    """
    Decorator aimed to check if user is not hitting limit of running tasks,
    set in X-Ray Manager user mode.
    Applies to start and continue methods.
    Limiting of user's running tasks is applied to Shared PRO only.
    """

    def check(*args):
        """
        If XRAYEXEC_UID exists, check if user does not exceed
        limit of running tasks
        """
        # TODO: [unification] ensure is_cl_shared_pro_edition really needed here
        # https://cloudlinux.atlassian.net/browse/XRAY-244 - (seems yes)
        if not is_cl_shared_pro_edition(skip_jwt_check=True):
            return

        proxyuser = get_xray_exec_user()
        if proxyuser is not None:
            ui_api_cli_instanse = args[0].ui_api_client
            resp = ui_api_cli_instanse.get_task_list()
            list_of_tasks = resp.get('result')
            if list_of_tasks is not None:
                running_count = len([item for item in list_of_tasks if
                                     item.get('status') == 'running'])
                if running_count >= user_tasks_count:
                    raise XRayError(
                        _('Limit of running tasks is {}. '
                          'You already have {} running task'.format(str(user_tasks_count),
                                                                    str(user_tasks_count))))

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        check(*args)
        return func(*args, **kwargs)

    return wrapper


def with_fpm_reload_restricted(func: Callable) -> Callable:
    """
    Decorator aimed to restrict frequent reloads of FPM service
    Applies to get_domain_info method.

    Decorator ordering invariant: this decorator MUST sit OUTSIDE
    user_mode_verification (i.e. ``@with_fpm_reload_restricted`` above
    ``@user_mode_verification``) on every get_domain_info implementation.
    The wrapper below runs ``check`` only AFTER ``func`` returns, so any
    ownership-failure XRayError raised by the inner user_mode_verification
    wrapper short-circuits the throttle probe. Reversing the order would
    let the throttle check inspect another user's FPM service name and
    raise a distinguishable XRayError(flag='warning') before the outer
    ownership check could uniformise the response, leaking a side channel
    on whether the queried domain exists and whose service is currently
    throttled (see XRAY-244 / CLPRO-3079 information-oracle defense).
    """

    def check(*args, data):
        """
        """
        # TODO: [unification] ensure is_cl_solo_edition really needed here
        # https://cloudlinux.atlassian.net/browse/XRAY-244 (seems yes)
        if is_cl_solo_edition(skip_jwt_check=True):
            return

        proxyuser = get_xray_exec_user()
        if proxyuser is not None and data.panel_fpm:
            _fpm_service = args[0].fpm_service_name(data)
            if FPMReloadController(_fpm_service).restrict():
                raise XRayError(
                    _('The X-Ray User service is currently busy. Operation is temporarily not permitted. '
                      'Try again in %s minute') % str(fpm_reload_timeout),
                    flag='warning')

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        info = func(*args, **kwargs)
        check(*args, data=info)
        return info

    return wrapper


def username_verification(func: Callable) -> Callable:
    def validate(username: str):
        """
        If exists, check XRAYEXEC_UID against user passed param
        """
        proxyuser = get_xray_exec_user()
        if proxyuser is not None and username != proxyuser:
            raise XRayError(_('Incorrect user for request'))

    @wraps(func)
    def wrapper(*args, **kwargs):
        username = kwargs['username']
        validate(username)
        return func(*args, **kwargs)

    return wrapper


def user_mode_advice_verification(func: Callable) -> Callable:
    """
    Decorator aimed to verify user in X-Ray Smart Advice user mode
    Applies to get_detailed_advice method, which takes part in
    advice_details and advice_apply methods
    """

    def verify(data: dict) -> None:
        """
        If exists, check XRAYEXEC_UID against user in metadata of an advice
        """
        proxyuser = get_xray_exec_user()
        try:
            username = data['metadata']['username']
        except KeyError:
            raise XRayError(_('Requested advice cannot be verified'))
        if proxyuser is not None and username != proxyuser:
            raise XRayError(_('Requested advice does not exist'))

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        advice_info, _ = func(*args, **kwargs)
        verify(advice_info)
        return advice_info, _

    return wrapper

Yohohohohohooho | Sanrei Aya