Yohohohohohooho | Sanrei Aya
Sanrei Aya


Server : LiteSpeed
System : Linux barito.iixcp.rumahweb.net 5.14.0-611.49.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Apr 21 16:39:08 EDT 2026 x86_64
User : elvh3918 ( 1528)
PHP Version : 8.2.31
Disable Function : mail
Directory :  /proc/thread-self/root/proc/self/root/usr/local/lib/python3.9/site-packages/kombu/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/thread-self/root/proc/self/root/usr/local/lib/python3.9/site-packages/kombu/utils/imports.py
"""Import related utilities."""

from __future__ import annotations

import importlib
import sys

from kombu.exceptions import reraise


def symbol_by_name(name, aliases=None, imp=None, package=None,
                   sep='.', default=None, **kwargs):
    """Get symbol by qualified name.

    The name should be the full dot-separated path to the class::

        modulename.ClassName

    Example::

        celery.concurrency.processes.TaskPool
                                    ^- class name

    or using ':' to separate module and symbol::

        celery.concurrency.processes:TaskPool

    If `aliases` is provided, a dict containing short name/long name
    mappings, the name is looked up in the aliases first.

    Examples
    --------
        >>> symbol_by_name('celery.concurrency.processes.TaskPool')
        <class 'celery.concurrency.processes.TaskPool'>

        >>> symbol_by_name('default', {
        ...     'default': 'celery.concurrency.processes.TaskPool'})
        <class 'celery.concurrency.processes.TaskPool'>

        # Does not try to look up non-string names.
        >>> from celery.concurrency.processes import TaskPool
        >>> symbol_by_name(TaskPool) is TaskPool
        True
    """
    aliases = {} if not aliases else aliases
    if imp is None:
        imp = importlib.import_module

    if not isinstance(name, str):
        return name                                 # already a class

    name = aliases.get(name) or name
    sep = ':' if ':' in name else sep
    module_name, _, cls_name = name.rpartition(sep)
    if not module_name:
        cls_name, module_name = None, package if package else cls_name
    try:
        try:
            module = imp(module_name, package=package, **kwargs)
        except ValueError as exc:
            reraise(ValueError,
                    ValueError(f"Couldn't import {name!r}: {exc}"),
                    sys.exc_info()[2])
        return getattr(module, cls_name) if cls_name else module
    except (ImportError, AttributeError):
        if default is None:
            raise
    return default

Yohohohohohooho | Sanrei Aya