Utilities¶
Optimization¶
cmxflow.opt.optuna.Optimizer(workflow: Workflow, input_path: Path | str, study_name: str | None = None, storage: str | None = None, sampler: BaseSampler | None = None, pruner: BasePruner | None = None)
¶
Bayesian optimizer for cmxflow workflows using Optuna.
Automatically optimizes workflow parameters to maximize or minimize a score computed by a ScoreBlock at the end of the workflow.
Attributes:
| Name | Type | Description |
|---|---|---|
workflow |
The workflow to optimize. |
|
input_path |
Path to the input file for workflow execution. |
|
study |
Study
|
The Optuna study instance (available after optimize() is called). |
Initialize the optimizer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow
|
Workflow
|
The workflow to optimize. Must end with a ScoreBlock. |
required |
input_path
|
Path | str
|
Path to the input file for workflow execution. |
required |
study_name
|
str | None
|
Optional name for the Optuna study. |
None
|
storage
|
str | None
|
Optional storage URL for distributed optimization. |
None
|
sampler
|
BaseSampler | None
|
Optional Optuna sampler for parameter suggestions. |
None
|
pruner
|
BasePruner | None
|
Optional Optuna pruner for early stopping. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the workflow does not end with a ScoreBlock. |
ValueError
|
If the workflow has no optimizable parameters. |
best_params: dict[str, Any]
property
¶
Get the best parameter values found during optimization.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary mapping parameter names to their optimal values. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If optimize() has not been called yet. |
best_score: float
property
¶
Get the best score achieved during optimization.
Returns:
| Type | Description |
|---|---|
float
|
The best score value. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If optimize() has not been called yet. |
study: optuna.Study
property
¶
Get the Optuna study instance.
Returns:
| Type | Description |
|---|---|
Study
|
The Optuna study. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If optimize() has not been called yet. |
optimize(n_trials: int = 100, timeout: float | None = None, direction: str = 'maximize', n_jobs: int = 1, show_progress_bar: bool = True, callbacks: list[Any] | None = None) -> optuna.Study
¶
Run the optimization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_trials
|
int
|
Number of trials to run. |
100
|
timeout
|
float | None
|
Maximum time in seconds for the optimization. |
None
|
direction
|
str
|
Optimization direction ("maximize" or "minimize"). |
'maximize'
|
n_jobs
|
int
|
Number of parallel jobs (use -1 for all CPUs). |
1
|
show_progress_bar
|
bool
|
Whether to show a progress bar. |
True
|
callbacks
|
list[Any] | None
|
Optional list of Optuna callback functions. |
None
|
Returns:
| Type | Description |
|---|---|
Study
|
The Optuna study containing optimization results. |
set_best_params() -> None
¶
Set the workflow parameters to their optimal values.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If optimize() has not been called yet. |
Serialization¶
cmxflow.utils.serial.WorkflowRegistry(path: Path | str = '~/.cmxflow/registry')
¶
A registry for saving, listing, and loading named workflows.
Workflows are stored as gzip-compressed pickle files in a directory, with a JSON metadata file tracking names, dates, and representations.
Attributes:
| Name | Type | Description |
|---|---|---|
path |
Directory where registry files are stored. |
Initialize the workflow registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Directory to store registry files. Created if it doesn't exist. |
'~/.cmxflow/registry'
|
register(name: str, workflow: Workflow, overwrite: bool = False) -> None
¶
Register a workflow under a given name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name to register the workflow under. |
required |
workflow
|
Workflow
|
The workflow to register. |
required |
overwrite
|
bool
|
If True, overwrite an existing entry with the same name. |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If name already exists and overwrite is False. |
WorkflowValidationError
|
If the workflow fails validation. |
list() -> pd.DataFrame
¶
List all registered workflows.
Returns:
| Type | Description |
|---|---|
DataFrame
|
DataFrame with columns: name, date, representation. |
load(name: str) -> Workflow
¶
Load a registered workflow by name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the workflow to load. |
required |
Returns:
| Type | Description |
|---|---|
Workflow
|
The loaded workflow. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If no workflow with the given name exists. |
remove(name: str) -> None
¶
Remove a registered workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the workflow to remove. |
required |
Raises:
| Type | Description |
|---|---|
KeyError
|
If no workflow with the given name exists. |
cmxflow.utils.serial.save_workflow(workflow: Workflow, path: Path | str) -> None
¶
Save a workflow to a gzip-compressed pickle file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow
|
Workflow
|
The workflow to save. |
required |
path
|
Path | str
|
Path to save the workflow to. |
required |
Raises:
| Type | Description |
|---|---|
WorkflowValidationError
|
If the workflow fails validation. |
cmxflow.utils.serial.load_workflow(path: Path | str) -> Workflow
¶
Load a workflow from a file.
Supports gzip-compressed pickle files and legacy uncompressed pickle files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path | str
|
Path to load the workflow from. |
required |
Returns:
| Type | Description |
|---|---|
Workflow
|
The loaded workflow. |
Raises:
| Type | Description |
|---|---|
WorkflowValidationError
|
If the loaded workflow fails validation. |
FileNotFoundError
|
If the file does not exist. |
Parallel Execution¶
cmxflow.utils.parallel.ParallelBlock(block: Block, config: ParallelConfig)
¶
Wrapper that executes a block's forward method in parallel.
Supports use as a context manager to reuse the process pool across multiple calls::
pb = make_parallel(block, max_workers=4)
with pb:
result1 = list(pb(iter1))
result2 = list(pb(iter2))
Attributes:
| Name | Type | Description |
|---|---|---|
_block |
The wrapped block instance. |
|
_config |
Parallel execution configuration. |
|
_executor |
ProcessPoolExecutor | None
|
Executor when used as a context manager, else None. |
Initialize the parallel wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block
|
Block
|
Block instance to wrap. |
required |
config
|
ParallelConfig
|
Parallel execution configuration. |
required |
__enter__() -> ParallelBlock
¶
Enter the context manager, creating a reusable process pool.
Returns:
| Type | Description |
|---|---|
ParallelBlock
|
Self with an active executor. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If already inside a |
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None
¶
Exit the context manager, shutting down the process pool.
__call__(items: Iterator[Any]) -> Iterator[Any]
¶
Execute the wrapped block in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
Iterator[Any]
|
Input iterator of items to process. |
required |
Yields:
| Type | Description |
|---|---|
Any
|
Processed items from the block's forward method. |
__getattr__(name: str) -> Any
¶
Delegate attribute access to the wrapped block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Attribute name to look up. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The attribute value from the wrapped block. |
__repr__() -> str
¶
Return a string representation of the parallel wrapper.
Returns:
| Type | Description |
|---|---|
str
|
String representation showing the wrapped block. |
cmxflow.utils.parallel.make_parallel(block: Block, max_workers: int | None = None, chunk_size: int = 1, ordered: bool = True, error_handling: Literal['raise', 'skip', 'log'] = 'skip') -> ParallelBlock
¶
Create a parallel-enabled wrapper around an existing block instance.
Wraps the block to execute its forward method in parallel across multiple processes using ProcessPoolExecutor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block
|
Block
|
Block instance to wrap. |
required |
max_workers
|
int | None
|
Maximum number of worker processes. Defaults to os.cpu_count(). |
None
|
chunk_size
|
int
|
Number of items per worker task. |
1
|
ordered
|
bool
|
If True (default), preserve input order in output. |
True
|
error_handling
|
Literal['raise', 'skip', 'log']
|
How to handle errors ("raise", "skip", "log"). |
'skip'
|
Returns:
| Type | Description |
|---|---|
ParallelBlock
|
ParallelBlock wrapper with parallel execution enabled. |