""" Dynamic Diffusers Pipeline Loader This module provides dynamic discovery and loading of diffusers pipelines at runtime, eliminating the need for per-pipeline conditional statements. New pipelines added to diffusers become available automatically without code changes. The module also supports discovering other diffusers classes like schedulers, models, and other components, making it a generic solution for dynamic class loading. Usage: from diffusers_dynamic_loader import load_diffusers_pipeline, get_available_pipelines # Load by class name pipe = load_diffusers_pipeline(class_name="StableDiffusionPipeline", model_id="...", torch_dtype=torch.float16) # Load by task alias pipe = load_diffusers_pipeline(task="text-to-image", model_id="...", torch_dtype=torch.float16) # Load using model_id (infers from HuggingFace Hub if possible) pipe = load_diffusers_pipeline(model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16) # Get list of available pipelines available = get_available_pipelines() # Discover other diffusers classes (schedulers, models, etc.) schedulers = discover_diffusers_classes("SchedulerMixin") models = discover_diffusers_classes("ModelMixin") """ import importlib import re import sys from typing import Any, Dict, List, Optional, Tuple, Type # Global cache for discovered pipelines - computed once per process _pipeline_registry: Optional[Dict[str, Type]] = None _task_aliases: Optional[Dict[str, List[str]]] = None # Global cache for other discovered class types _class_registries: Dict[str, Dict[str, Type]] = {} def _camel_to_kebab(name: str) -> str: """ Convert CamelCase to kebab-case. Examples: StableDiffusionPipeline -> stable-diffusion-pipeline StableDiffusionXLImg2ImgPipeline -> stable-diffusion-xl-img-2-img-pipeline """ # Insert hyphen before uppercase letters (but not at the start) s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name) # Insert hyphen before uppercase letters following lowercase letters or numbers s2 = re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1) return s2.lower() def _extract_task_keywords(class_name: str) -> List[str]: """ Extract task-related keywords from a pipeline class name. This function derives useful task aliases from the class name without hardcoding per-pipeline branches. Returns a list of potential task aliases for this pipeline. """ aliases = [] name_lower = class_name.lower() # Direct task mappings based on common patterns in class names task_patterns = { 'text2image': ['text-to-image', 'txt2img', 'text2image'], 'texttoimage': ['text-to-image', 'txt2img', 'text2image'], 'txt2img': ['text-to-image', 'txt2img', 'text2image'], 'img2img': ['image-to-image', 'img2img', 'image2image'], 'image2image': ['image-to-image', 'img2img', 'image2image'], 'imagetoimage': ['image-to-image', 'img2img', 'image2image'], 'img2video': ['image-to-video', 'img2vid', 'img2video'], 'imagetovideo': ['image-to-video', 'img2vid', 'img2video'], 'text2video': ['text-to-video', 'txt2vid', 'text2video'], 'texttovideo': ['text-to-video', 'txt2vid', 'text2video'], 'inpaint': ['inpainting', 'inpaint'], 'depth2img': ['depth-to-image', 'depth2img'], 'depthtoimage': ['depth-to-image', 'depth2img'], 'controlnet': ['controlnet', 'control-net'], 'upscale': ['upscaling', 'upscale', 'super-resolution'], 'superresolution': ['upscaling', 'upscale', 'super-resolution'], } # Check for each pattern in the class name for pattern, task_aliases in task_patterns.items(): if pattern in name_lower: aliases.extend(task_aliases) # Also detect general pipeline types from the class name structure # E.g., StableDiffusionPipeline -> stable-diffusion, flux -> flux # Remove "Pipeline" suffix and convert to kebab case if class_name.endswith('Pipeline'): base_name = class_name[:-8] # Remove "Pipeline" kebab_name = _camel_to_kebab(base_name) aliases.append(kebab_name) # Extract model family name (e.g., "stable-diffusion" from "stable-diffusion-xl-img-2-img") parts = kebab_name.split('-') if len(parts) >= 2: # Try the first two words as a family name family = '-'.join(parts[:2]) if family not in aliases: aliases.append(family) # If no specific task pattern matched but class contains "Pipeline", add "text-to-image" as default # since most diffusion pipelines support text-to-image generation if 'text-to-image' not in aliases and 'image-to-image' not in aliases: # Only add for pipelines that seem to be generation pipelines (not schedulers, etc.) if 'pipeline' in name_lower and not any(x in name_lower for x in ['scheduler', 'processor', 'encoder']): # Don't automatically add - let it be explicit pass return list(set(aliases)) # Remove duplicates def discover_diffusers_classes( base_class_name: str, include_base: bool = True ) -> Dict[str, Type]: """ Discover all subclasses of a given base class from diffusers. This function provides a generic way to discover any type of diffusers class, not just pipelines. It can be used to discover schedulers, models, processors, and other components. Args: base_class_name: Name of the base class to search for subclasses (e.g., "DiffusionPipeline", "SchedulerMixin", "ModelMixin") include_base: Whether to include the base class itself in results Returns: Dict mapping class names to class objects Examples: # Discover all pipeline classes pipelines = discover_diffusers_classes("DiffusionPipeline") # Discover all scheduler classes schedulers = discover_diffusers_classes("SchedulerMixin") # Discover all model classes models = discover_diffusers_classes("ModelMixin") # Discover AutoPipeline classes auto_pipelines = discover_diffusers_classes("AutoPipelineForText2Image") """ global _class_registries # Check cache first if base_class_name in _class_registries: return _class_registries[base_class_name] import diffusers # Try to get the base class from diffusers base_class = None try: base_class = getattr(diffusers, base_class_name) except AttributeError: # Try to find in submodules for submodule in ['schedulers', 'models', 'pipelines']: try: module = importlib.import_module(f'diffusers.{submodule}') if hasattr(module, base_class_name): base_class = getattr(module, base_class_name) break except (ImportError, ModuleNotFoundError): continue if base_class is None: raise ValueError(f"Could not find base class '{base_class_name}' in diffusers") registry: Dict[str, Type] = {} # Include base class if requested if include_base: registry[base_class_name] = base_class # Scan diffusers module for subclasses for attr_name in dir(diffusers): try: attr = getattr(diffusers, attr_name) if (isinstance(attr, type) and issubclass(attr, base_class) and (include_base or attr is not base_class)): registry[attr_name] = attr except (ImportError, AttributeError, TypeError, RuntimeError, ModuleNotFoundError): continue # Cache the results _class_registries[base_class_name] = registry return registry def get_available_classes(base_class_name: str) -> List[str]: """ Get a sorted list of all discovered class names for a given base class. Args: base_class_name: Name of the base class (e.g., "SchedulerMixin") Returns: Sorted list of discovered class names """ return sorted(discover_diffusers_classes(base_class_name).keys()) def _discover_pipelines() -> Tuple[Dict[str, Type], Dict[str, List[str]]]: """ Discover all subclasses of DiffusionPipeline from diffusers. This function uses the generic discover_diffusers_classes() internally and adds pipeline-specific task alias generation. It also includes AutoPipeline classes which are special utility classes for automatic pipeline selection. Returns: A tuple of (pipeline_registry, task_aliases) where: - pipeline_registry: Dict mapping class names to class objects - task_aliases: Dict mapping task aliases to lists of class names """ # Use the generic discovery function pipeline_registry = discover_diffusers_classes("DiffusionPipeline", include_base=True) # Also add AutoPipeline classes - these are special utility classes that are # NOT subclasses of DiffusionPipeline but are commonly used import diffusers auto_pipeline_classes = [ "AutoPipelineForText2Image", "AutoPipelineForImage2Image", "AutoPipelineForInpainting", ] for cls_name in auto_pipeline_classes: try: cls = getattr(diffusers, cls_name) if cls is not None: pipeline_registry[cls_name] = cls except AttributeError: # Class not available in this version of diffusers pass # Generate task aliases for pipelines task_aliases: Dict[str, List[str]] = {} for attr_name in pipeline_registry: if attr_name == "DiffusionPipeline": continue # Skip base class for alias generation aliases = _extract_task_keywords(attr_name) for alias in aliases: if alias not in task_aliases: task_aliases[alias] = [] if attr_name not in task_aliases[alias]: task_aliases[alias].append(attr_name) return pipeline_registry, task_aliases def get_pipeline_registry() -> Dict[str, Type]: """ Get the cached pipeline registry. Returns a dictionary mapping pipeline class names to their class objects. The registry is built on first access and cached for subsequent calls. """ global _pipeline_registry, _task_aliases if _pipeline_registry is None: _pipeline_registry, _task_aliases = _discover_pipelines() return _pipeline_registry def get_task_aliases() -> Dict[str, List[str]]: """ Get the cached task aliases dictionary. Returns a dictionary mapping task aliases (e.g., "text-to-image") to lists of pipeline class names that support that task. """ global _pipeline_registry, _task_aliases if _task_aliases is None: _pipeline_registry, _task_aliases = _discover_pipelines() return _task_aliases def get_available_pipelines() -> List[str]: """ Get a sorted list of all discovered pipeline class names. Returns: List of pipeline class names available for loading. """ return sorted(get_pipeline_registry().keys()) def get_available_tasks() -> List[str]: """ Get a sorted list of all available task aliases. Returns: List of task aliases (e.g., ["text-to-image", "image-to-image", ...]) """ return sorted(get_task_aliases().keys()) def resolve_pipeline_class( class_name: Optional[str] = None, task: Optional[str] = None, model_id: Optional[str] = None ) -> Type: """ Resolve a pipeline class from class_name, task, or model_id. Priority: 1. If class_name is provided, look it up directly 2. If task is provided, resolve through task aliases 3. If model_id is provided, try to infer from HuggingFace Hub Args: class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline") task: Task alias (e.g., "text-to-image", "img2img") model_id: HuggingFace model ID (e.g., "runwayml/stable-diffusion-v1-5") Returns: The resolved pipeline class. Raises: ValueError: If no pipeline could be resolved. """ registry = get_pipeline_registry() aliases = get_task_aliases() # 1. Direct class name lookup if class_name: if class_name in registry: return registry[class_name] # Try case-insensitive match for name, cls in registry.items(): if name.lower() == class_name.lower(): return cls raise ValueError( f"Unknown pipeline class '{class_name}'. " f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}..." ) # 2. Task alias lookup if task: task_lower = task.lower().replace('_', '-') if task_lower in aliases: # Return the first matching pipeline for this task matching_classes = aliases[task_lower] if matching_classes: return registry[matching_classes[0]] # Try partial matching for alias, classes in aliases.items(): if task_lower in alias or alias in task_lower: if classes: return registry[classes[0]] raise ValueError( f"Unknown task '{task}'. " f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..." ) # 3. Try to infer from HuggingFace Hub if model_id: try: from huggingface_hub import model_info info = model_info(model_id) # Check pipeline_tag if hasattr(info, 'pipeline_tag') and info.pipeline_tag: tag = info.pipeline_tag.lower().replace('_', '-') if tag in aliases: matching_classes = aliases[tag] if matching_classes: return registry[matching_classes[0]] # Check model card for hints if hasattr(info, 'cardData') and info.cardData: card = info.cardData if 'pipeline_tag' in card: tag = card['pipeline_tag'].lower().replace('_', '-') if tag in aliases: matching_classes = aliases[tag] if matching_classes: return registry[matching_classes[0]] except ImportError: # huggingface_hub not available pass except (KeyError, AttributeError, ValueError, OSError): # Model info lookup failed - common cases: # - KeyError: Missing keys in model card # - AttributeError: Missing attributes on model info # - ValueError: Invalid model data # - OSError: Network or file access issues pass # Fallback: use DiffusionPipeline.from_pretrained which auto-detects # DiffusionPipeline is always added to registry in _discover_pipelines (line 132) # but use .get() with import fallback for extra safety from diffusers import DiffusionPipeline return registry.get('DiffusionPipeline', DiffusionPipeline) raise ValueError( "Must provide at least one of: class_name, task, or model_id. " f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}... " f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..." ) def load_diffusers_pipeline( class_name: Optional[str] = None, task: Optional[str] = None, model_id: Optional[str] = None, from_single_file: bool = False, **kwargs ) -> Any: """ Load a diffusers pipeline dynamically. This function resolves the appropriate pipeline class based on the provided parameters and instantiates it with the given kwargs. Args: class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline") task: Task alias (e.g., "text-to-image", "img2img") model_id: HuggingFace model ID or local path from_single_file: If True, use from_single_file() instead of from_pretrained() **kwargs: Additional arguments passed to from_pretrained() or from_single_file() Returns: An instantiated pipeline object. Raises: ValueError: If no pipeline could be resolved. Exception: If pipeline loading fails. Examples: # Load by class name pipe = load_diffusers_pipeline( class_name="StableDiffusionPipeline", model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16 ) # Load by task pipe = load_diffusers_pipeline( task="text-to-image", model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16 ) # Load from single file pipe = load_diffusers_pipeline( class_name="StableDiffusionPipeline", model_id="/path/to/model.safetensors", from_single_file=True, torch_dtype=torch.float16 ) """ # Resolve the pipeline class pipeline_class = resolve_pipeline_class( class_name=class_name, task=task, model_id=model_id ) # If no model_id provided but we have a class, we can't load if model_id is None: raise ValueError("model_id is required to load a pipeline") # Load the pipeline try: if from_single_file: # Check if the class has from_single_file method if hasattr(pipeline_class, 'from_single_file'): return pipeline_class.from_single_file(model_id, **kwargs) else: raise ValueError( f"Pipeline class {pipeline_class.__name__} does not support from_single_file(). " f"Use from_pretrained() instead." ) else: return pipeline_class.from_pretrained(model_id, **kwargs) except Exception as e: # Provide helpful error message available = get_available_pipelines() raise RuntimeError( f"Failed to load pipeline '{pipeline_class.__name__}' from '{model_id}': {e}\n" f"Available pipelines: {', '.join(available[:20])}..." ) from e def get_pipeline_info(class_name: str) -> Dict[str, Any]: """ Get information about a specific pipeline class. Args: class_name: The pipeline class name Returns: Dictionary with pipeline information including: - name: Class name - aliases: List of task aliases - supports_single_file: Whether from_single_file() is available - docstring: Class docstring (if available) """ registry = get_pipeline_registry() aliases = get_task_aliases() if class_name not in registry: raise ValueError(f"Unknown pipeline: {class_name}") cls = registry[class_name] # Find all aliases for this pipeline pipeline_aliases = [] for alias, classes in aliases.items(): if class_name in classes: pipeline_aliases.append(alias) return { 'name': class_name, 'aliases': pipeline_aliases, 'supports_single_file': hasattr(cls, 'from_single_file'), 'docstring': cls.__doc__[:200] if cls.__doc__ else None }