Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/phpinfo_utils.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import dataclasses
import os
import pwd
import secrets
import subprocess
from contextlib import contextmanager
from pathlib import Path

import requests
import urllib3
from clcommon.clpwd import drop_privileges
from clcommon.cpapi import docroot
from requests.exceptions import ChunkedEncodingError
from secureio import disable_quota
from urllib3.exceptions import ReadTimeoutError

from xray import gettext as _
from xray.internal import utils, exceptions
from xray.internal.constants import ALLOWED_INI_PREFIXES

# long timeout is set because our tested
# sites may be really slow
TIMEOUT: int = 10
# Cap the response body at 1 MiB. The phpinfo payload we ship in
# `_temporary_phpinfo_file` is well under 1 KiB, so 1 MiB leaves a
# generous margin while preventing a tenant-controlled webserver from
# inflating the root daemon's RSS via an unbounded response body.
_MAX_RESPONSE_BYTES: int = 1024 * 1024
_CHUNK_BYTES: int = 8192
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13) '
                  'Gecko/20101209 CentOS/3.6-2.el5.centos Firefox/3.6.13'
}


class WebsiteNotResponding(exceptions.XRayManagerError):
    def __init__(self, url, details):
        self.url = url
        self.details = details


class _BoundedResponse:
    """A drop-in replacement exposing `.text` from a bounded streamed read."""

    __slots__ = ('text',)

    def __init__(self, text: str):
        self.text = text


@utils.retry_on_exceptions(3, [ChunkedEncodingError])
def _request_url(url):
    """
    retry on:
     - ChunkedEncodingError -> sometimes error happens due to network issues/glitch
    """
    try:
        with requests.get(url, timeout=TIMEOUT, verify=False,
                          headers=HEADERS, stream=True) as response:
            response.raise_for_status()
            buf = bytearray()
            for chunk in response.iter_content(chunk_size=_CHUNK_BYTES):
                if not chunk:
                    continue
                if len(buf) + len(chunk) > _MAX_RESPONSE_BYTES:
                    raise exceptions.XRayManagerError(
                        _("phpinfo response for %s exceeds %d bytes; "
                          "refusing to buffer attacker-controlled body.")
                        % (url, _MAX_RESPONSE_BYTES))
                buf.extend(chunk)
            # NB: do not use response.apparent_encoding here — with stream=True
            # the body is already consumed by iter_content(), so accessing it
            # touches response.content and raises RuntimeError. The buffered
            # bytes are decoded with errors='replace', so utf-8 is a safe
            # fallback when the server sends no charset in Content-Type.
            encoding = response.encoding or 'utf-8'
            try:
                text = buf.decode(encoding, errors='replace')
            except LookupError:
                # response.encoding comes from the tenant-controlled
                # Content-Type charset; an unknown codec name (e.g.
                # charset=bogus) raises LookupError that errors='replace'
                # does not catch. requests' own .text falls back here, so
                # mirror that and decode as utf-8.
                text = buf.decode('utf-8', errors='replace')
    except ConnectionError as e:
        # really strange behavior of requests that wrap
        # errors inside of ConnectionError
        if e.args and isinstance(e.args[0], ReadTimeoutError):
            raise
        raise WebsiteNotResponding(url, details=str(e))
    except requests.RequestException as e:
        raise exceptions.XRayManagerError(
            _("Unable to detect php version for website "
              "because it is not accessible. "
              "Try again and contact an administrator if the issue persists. "
              "Original error: %s. ") % str(e))

    return _BoundedResponse(text)


@contextmanager
def _temporary_phpinfo_file(username: str, document_root: Path):
    php_file_contents = """
<?php

$php_ini_scan_dir = getenv("PHP_INI_SCAN_DIR");
if(!empty($php_ini_scan_dir)) {
  // get first non-empty path
  $php_ini_scan_dir = array_values(array_filter(explode(":", $php_ini_scan_dir)))[0];
}


echo "phpversion=" . phpversion() . "\n";
echo "ini_scan_dir=" . ($php_ini_scan_dir ? $php_ini_scan_dir: PHP_CONFIG_FILE_SCAN_DIR) . "\n";
echo "php_sapi_name=". php_sapi_name() . "\n";
echo "include_path=" . get_include_path() . "\n";

"""
    php_file_name = f'xray_info_{secrets.token_hex(8)}.php'
    php_file_path = document_root / php_file_name
    with drop_privileges(username), disable_quota():
        php_file_path.write_text(php_file_contents)

    try:
        yield php_file_name
    finally:
        php_file_path.unlink()

@dataclasses.dataclass
class PhpConfiguration:
    # 'user'
    username: str
    # '8.3.30'
    phpversion: str
    # '/etc/php.d/'
    ini_scan_dir: str
    # 'cgi-fcgi'
    php_sapi_name: str
    # '.:/opt/alt/php80/usr/share/pear'
    include_path: str

    @property
    def short_php_version(self) -> str:
        return ''.join(self.phpversion.split('.')[:2])

    def get_full_php_version(self, default_prefix: str):
        if '/opt/alt' in self.include_path:
            return f"alt-php{self.short_php_version}"
        return f"{default_prefix}{self.short_php_version}"

    @property
    def absolute_ini_scan_dir(self):
        # the only directory that we expect to be changed in cagefs
        # is our conf link which is managed by selectorctl
        if 'link/conf' in self.ini_scan_dir:
            resolved = _resolve_ini_path_in_cagefs(self.username, self.ini_scan_dir)
        else:
            resolved = self.ini_scan_dir

        if resolved:
            resolved = os.path.realpath(resolved)
            if not any(
                (resolved + '/').startswith(p) for p in ALLOWED_INI_PREFIXES
            ):
                raise ValueError(
                    f'ini_scan_dir outside allowed paths: {resolved!r}'
                )
        return resolved

    @property
    def is_php_fpm(self):
        return self.php_sapi_name == 'fpm-fcgi'

def _parse_configuration(username: str, response: str) -> PhpConfiguration:
    config = {}
    for line in response.split('\n'):
        if not line.strip():
            continue

        key, value = line.split('=')

        config[key] = value.strip()

    return PhpConfiguration(username=username, **config)


def _resolve_ini_path_in_cagefs(username: str, path: str):
    """
    ini path inside cagefs can be a symlink
    and as cagefs has different namespace for each user,
    the only way to know that for sure is to dive into cage
    and resolve path there
    """
    try:
        pwd.getpwnam(username)
    except KeyError:
        return None
    cmd = ['/sbin/cagefs_enter_user', username, '/usr/bin/realpath', path]
    try:
        resolved_path = subprocess.check_output(
            cmd, text=True, stderr=subprocess.DEVNULL).strip()
    except subprocess.CalledProcessError:
        return None

    if resolved_path.startswith('/etc/cl.php.d/'):
        prefix = utils.cagefsctl_get_prefix(username)
        if prefix is None:
            raise ValueError(
                _('CageFS prefix resolved as None, but should be a number'))
        return f'/var/cagefs/{prefix}/{username}{resolved_path}'

    return resolved_path



def get_php_configuration(username: str, domain: str) -> PhpConfiguration:
    """
    Writes temporary phpinfo-like file to document root
    and executes request to website to retrieve the current
    php version and configuration
    """
    # if certificate is bad, but the site itself works,
    # we consider it ok
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

    with _temporary_phpinfo_file(username, Path(docroot(domain)[0])) as php_info_file:
        domain_phpinfo_file_path = domain + '/' + php_info_file

        try:
            http_url = 'http://' + domain_phpinfo_file_path
            response = _request_url(http_url)
        except WebsiteNotResponding:
            # Some websites did not enable HTTP to HTTPS redirection.
            # Try connecting with HTTPS protocol.
            https_url = 'https://' + domain_phpinfo_file_path
            response = _request_url(https_url)

    # you may think that we can use json, but we can't because it;s
    # optional php module on older php versions
    configuration = _parse_configuration(username, response.text)
    return configuration


Yohohohohohooho | Sanrei Aya