Skip to content

Utilities

Optimization

cmxflow.opt.optuna.Optimizer(workflow: Workflow, input_path: Path | str, study_name: str | None = None, storage: str | None = None, sampler: BaseSampler | None = None, pruner: BasePruner | None = None)

Bayesian optimizer for cmxflow workflows using Optuna.

Automatically optimizes workflow parameters to maximize or minimize a score computed by a ScoreBlock at the end of the workflow.

Attributes:

Name Type Description
workflow

The workflow to optimize.

input_path

Path to the input file for workflow execution.

study Study

The Optuna study instance (available after optimize() is called).

Initialize the optimizer.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to optimize. Must end with a ScoreBlock.

required
input_path Path | str

Path to the input file for workflow execution.

required
study_name str | None

Optional name for the Optuna study.

None
storage str | None

Optional storage URL for distributed optimization.

None
sampler BaseSampler | None

Optional Optuna sampler for parameter suggestions.

None
pruner BasePruner | None

Optional Optuna pruner for early stopping.

None

Raises:

Type Description
ValueError

If the workflow does not end with a ScoreBlock.

ValueError

If the workflow has no optimizable parameters.

best_params: dict[str, Any] property

Get the best parameter values found during optimization.

Returns:

Type Description
dict[str, Any]

Dictionary mapping parameter names to their optimal values.

Raises:

Type Description
RuntimeError

If optimize() has not been called yet.

best_score: float property

Get the best score achieved during optimization.

Returns:

Type Description
float

The best score value.

Raises:

Type Description
RuntimeError

If optimize() has not been called yet.

study: optuna.Study property

Get the Optuna study instance.

Returns:

Type Description
Study

The Optuna study.

Raises:

Type Description
RuntimeError

If optimize() has not been called yet.

optimize(n_trials: int = 100, timeout: float | None = None, direction: str = 'maximize', n_jobs: int = 1, show_progress_bar: bool = True, callbacks: list[Any] | None = None) -> optuna.Study

Run the optimization.

Parameters:

Name Type Description Default
n_trials int

Number of trials to run.

100
timeout float | None

Maximum time in seconds for the optimization.

None
direction str

Optimization direction ("maximize" or "minimize").

'maximize'
n_jobs int

Number of parallel jobs (use -1 for all CPUs).

1
show_progress_bar bool

Whether to show a progress bar.

True
callbacks list[Any] | None

Optional list of Optuna callback functions.

None

Returns:

Type Description
Study

The Optuna study containing optimization results.

set_best_params() -> None

Set the workflow parameters to their optimal values.

Raises:

Type Description
RuntimeError

If optimize() has not been called yet.

Serialization

cmxflow.utils.serial.WorkflowRegistry(path: Path | str = '~/.cmxflow/registry')

A registry for saving, listing, and loading named workflows.

Workflows are stored as gzip-compressed pickle files in a directory, with a JSON metadata file tracking names, dates, and representations.

Attributes:

Name Type Description
path

Directory where registry files are stored.

Initialize the workflow registry.

Parameters:

Name Type Description Default
path Path | str

Directory to store registry files. Created if it doesn't exist.

'~/.cmxflow/registry'

register(name: str, workflow: Workflow, overwrite: bool = False) -> None

Register a workflow under a given name.

Parameters:

Name Type Description Default
name str

Name to register the workflow under.

required
workflow Workflow

The workflow to register.

required
overwrite bool

If True, overwrite an existing entry with the same name.

False

Raises:

Type Description
ValueError

If name already exists and overwrite is False.

WorkflowValidationError

If the workflow fails validation.

list() -> pd.DataFrame

List all registered workflows.

Returns:

Type Description
DataFrame

DataFrame with columns: name, date, representation.

load(name: str) -> Workflow

Load a registered workflow by name.

Parameters:

Name Type Description Default
name str

Name of the workflow to load.

required

Returns:

Type Description
Workflow

The loaded workflow.

Raises:

Type Description
KeyError

If no workflow with the given name exists.

remove(name: str) -> None

Remove a registered workflow.

Parameters:

Name Type Description Default
name str

Name of the workflow to remove.

required

Raises:

Type Description
KeyError

If no workflow with the given name exists.

cmxflow.utils.serial.save_workflow(workflow: Workflow, path: Path | str) -> None

Save a workflow to a gzip-compressed pickle file.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to save.

required
path Path | str

Path to save the workflow to.

required

Raises:

Type Description
WorkflowValidationError

If the workflow fails validation.

cmxflow.utils.serial.load_workflow(path: Path | str) -> Workflow

Load a workflow from a file.

Supports gzip-compressed pickle files and legacy uncompressed pickle files.

Parameters:

Name Type Description Default
path Path | str

Path to load the workflow from.

required

Returns:

Type Description
Workflow

The loaded workflow.

Raises:

Type Description
WorkflowValidationError

If the loaded workflow fails validation.

FileNotFoundError

If the file does not exist.

Parallel Execution

cmxflow.utils.parallel.ParallelBlock(block: Block, config: ParallelConfig)

Wrapper that executes a block's forward method in parallel.

Supports use as a context manager to reuse the process pool across multiple calls::

pb = make_parallel(block, max_workers=4)
with pb:
    result1 = list(pb(iter1))
    result2 = list(pb(iter2))

Attributes:

Name Type Description
_block

The wrapped block instance.

_config

Parallel execution configuration.

_executor ProcessPoolExecutor | None

Executor when used as a context manager, else None.

Initialize the parallel wrapper.

Parameters:

Name Type Description Default
block Block

Block instance to wrap.

required
config ParallelConfig

Parallel execution configuration.

required

__enter__() -> ParallelBlock

Enter the context manager, creating a reusable process pool.

Returns:

Type Description
ParallelBlock

Self with an active executor.

Raises:

Type Description
RuntimeError

If already inside a with block (non-reentrant).

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None

Exit the context manager, shutting down the process pool.

__call__(items: Iterator[Any]) -> Iterator[Any]

Execute the wrapped block in parallel.

Parameters:

Name Type Description Default
items Iterator[Any]

Input iterator of items to process.

required

Yields:

Type Description
Any

Processed items from the block's forward method.

__getattr__(name: str) -> Any

Delegate attribute access to the wrapped block.

Parameters:

Name Type Description Default
name str

Attribute name to look up.

required

Returns:

Type Description
Any

The attribute value from the wrapped block.

__repr__() -> str

Return a string representation of the parallel wrapper.

Returns:

Type Description
str

String representation showing the wrapped block.

cmxflow.utils.parallel.make_parallel(block: Block, max_workers: int | None = None, chunk_size: int = 1, ordered: bool = True, error_handling: Literal['raise', 'skip', 'log'] = 'skip') -> ParallelBlock

Create a parallel-enabled wrapper around an existing block instance.

Wraps the block to execute its forward method in parallel across multiple processes using ProcessPoolExecutor.

Parameters:

Name Type Description Default
block Block

Block instance to wrap.

required
max_workers int | None

Maximum number of worker processes. Defaults to os.cpu_count().

None
chunk_size int

Number of items per worker task.

1
ordered bool

If True (default), preserve input order in output.

True
error_handling Literal['raise', 'skip', 'log']

How to handle errors ("raise", "skip", "log").

'skip'

Returns:

Type Description
ParallelBlock

ParallelBlock wrapper with parallel execution enabled.

Example
block = MoleculeAlignBlock()
block.input_files["query"] = "refs.sdf"
parallel_block = make_parallel(block, max_workers=4)
results = list(parallel_block(molecule_iterator))