Skip to content

Core

Base classes, workflow orchestration, and parameter types.

Block Base Classes

cmxflow.block.BlockBase(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)

Bases: ABC

Base class with shared functionality for all block types.

Attributes:

Name Type Description
name

Human-readable name for the block.

params dict[str, Continuous | Categorical | Integer]

Dictionary of mutable parameters.

input_files dict[str, Path]

List of required input file names.

Initialize the block.

Parameters:

Name Type Description Default
name str | None

Optional name for the block. Defaults to class name.

None
input_files list[str] | None

Optional files that will be surfaced as required input at run time.

None
input_text list[str] | None

Optional text that will be surfaces as required input at run time.

None

set_inputs(**config: str) -> None

Set inputs if matching required files or text.

Parameters:

Name Type Description Default
**config str

Keyword arguments mapping input names to values. For file inputs, values should be file paths. For text inputs, values should be strings. For params, values should match the parameter type.

{}

get_params() -> dict[str, Any]

Get all mutable parameters for this block.

Returns:

Type Description
dict[str, Any]

Dictionary mapping parameter names to Parameter objects.

get_param(key: str) -> Any

Get the current value of a mutable parameter.

Parameters:

Name Type Description Default
key str

Name of the parameter to retrieve.

required

Returns:

Type Description
Any

The current value of the parameter.

Raises:

Type Description
KeyError

If the parameter name is not registered.

mutable(*parameters: Continuous | Integer | Categorical) -> None

Register parameters as mutable for optimization.

Parameters:

Name Type Description Default
*parameters Continuous | Integer | Categorical

Parameter objects to register as mutable.

()

reset_cache() -> None

Reset any cached state for a new optimization iteration.

Called at the start of each optimization trial to ensure blocks don't retain stale cached data. Override in subclasses that maintain internal caches.

cmxflow.block.Block(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)

Bases: BlockBase

Block that transforms items from an iterator.

Subclasses must implement forward to define the transformation.

forward(arg: Any) -> Any abstractmethod

Define a single unit of work.

Parameters:

Name Type Description Default
arg Any

Input item to transform.

required

Returns:

Type Description
Any

Transformed item.

__call__(iter: Iterator[Any]) -> Iterator[Any]

Execute the block on an iterator of items.

Parameters:

Name Type Description Default
iter Iterator[Any]

Iterator of input items to process.

required

Yields:

Type Description
Any

Transformed items that pass input and output checks.

check_input(arg: Any) -> bool

Validate an input item before processing.

Override this method to filter out invalid inputs.

Parameters:

Name Type Description Default
arg Any

Input item to validate.

required

Returns:

Type Description
bool

True if the item should be processed, False to skip.

check_output(arg: Any) -> bool

Validate an output item before yielding.

Override this method to filter out invalid outputs.

Parameters:

Name Type Description Default
arg Any

Output item to validate.

required

Returns:

Type Description
bool

True if the item should be yielded, False to discard.

cmxflow.block.SourceBlock(reader: Callable[[Path], Iterator[Any]])

Bases: BlockBase

Block that produces items from a source file.

Initialize a source block with a reader function.

Parameters:

Name Type Description Default
reader Callable[[Path], Iterator[Any]]

Callable that takes a Path and yields items.

required

forward(path: Path) -> Iterator[Any]

Read items from the source file.

Parameters:

Name Type Description Default
path Path

Path to the source file.

required

Yields:

Type Description
Any

Items read from the source file.

__call__(path: Path) -> Iterator[Any]

Execute the source block.

Parameters:

Name Type Description Default
path Path

Path to the source file.

required

Returns:

Type Description
Iterator[Any]

Iterator of items from the source file.

cmxflow.block.SinkBlock(writer: Callable[[Iterator[Any], Path], None])

Bases: BlockBase

Block that terminates a workflow by writing to a file.

Initialize a sink block with a writer function.

Parameters:

Name Type Description Default
writer Callable[[Iterator[Any], Path], None]

Callable that takes an iterator and Path to write items.

required

forward(iter: Iterator[Any], path: Path) -> None

Write items to the destination file.

Parameters:

Name Type Description Default
iter Iterator[Any]

Iterator of items to write.

required
path Path

Path to the destination file.

required

__call__(iter: Iterator[Any], path: Path) -> None

Execute the sink block.

Parameters:

Name Type Description Default
iter Iterator[Any]

Iterator of items to write.

required
path Path

Path to the destination file.

required

cmxflow.block.ScoreBlock(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)

Bases: BlockBase

Block with state-dependent behavior for optimization vs normal execution.

During optimization (when uid is provided), computes a score via objective(). During normal execution, passes items through via forward().

Subclasses must implement objective() for optimization scoring. Optionally override forward() to transform items during normal execution.

Initialize the score block.

Parameters:

Name Type Description Default
name str | None

Optional name for the block.

None
input_files list[str] | None

Optional files required at run time.

None
input_text list[str] | None

Optional text required at run time.

None

objective(iter: Iterator[Any]) -> float abstractmethod

Compute the optimization objective score.

Called only during optimization. Must be implemented by subclasses.

Parameters:

Name Type Description Default
iter Iterator[Any]

Iterator of items to score.

required

Returns:

Type Description
float

Score value (higher is better for maximize, lower for minimize).

forward(item: Any) -> Any

Transform a single item during normal (non-optimization) execution.

Default implementation passes items through unchanged. Override to filter or transform items.

Parameters:

Name Type Description Default
item Any

Input item.

required

Returns:

Type Description
Any

Transformed item, or None to filter it out.

__call__(iter: Iterator[Any], uid: tuple[str, ...] | None = None) -> float | Iterator[Any]

Execute the score block.

Parameters:

Name Type Description Default
iter Iterator[Any]

Iterator of input items.

required
uid tuple[str, ...] | None

Unique identifier for caching (present during optimization).

None

Returns:

Type Description
float | Iterator[Any]

If uid is provided (optimization mode): float score.

float | Iterator[Any]

If uid is None (normal mode): Iterator of transformed items.

Workflow

cmxflow.workflow.Workflow(name: str = 'Workflow')

A workflow that chains multiple blocks together.

Workflows allow dynamic composition of blocks into executable pipelines. Blocks are executed sequentially, with the output of each block passed as input to the next.

Attributes:

Name Type Description
name

Human-readable name for the workflow.

blocks list[BlockBase]

List of blocks in execution order.

Initialize an empty workflow.

Parameters:

Name Type Description Default
name str

Optional name for the workflow.

'Workflow'

reset_cache() -> None

Reset cached state in all blocks for a new optimization iteration.

add(*blocks: Any) -> Workflow

Add one or more blocks to the end of the workflow.

Parameters:

Name Type Description Default
*blocks Any

The block(s) to add.

()

Returns:

Type Description
Workflow

Self for method chaining.

insert(index: int, block: BlockBase) -> Workflow

Insert a block at a specific position.

Parameters:

Name Type Description Default
index int

Position to insert the block.

required
block BlockBase

The block to insert.

required

Returns:

Type Description
Workflow

Self for method chaining.

clear() -> Workflow

Remove all blocks from the workflow.

Returns:

Type Description
Workflow

Self for method chaining.

get_params() -> list[Parameter]

Get all mutable parameters from all blocks.

Returns:

Type Description
list[Parameter]

List of parameter objects linked to blocks.

check() -> None

Validate the workflow structure.

Ensures the workflow has a SourceBlock first, SinkBlock last, and only Block instances in between.

Raises:

Type Description
IndexError

If fewer than 2 blocks are present.

ValueError

If block types are in incorrect positions.

get_required_input() -> dict[str, type]

Get all required inputs from blocks in the workflow.

Returns:

Type Description
dict[str, type]

Dictionary mapping input keys to their expected types.

dict[str, type]

Keys are formatted as '{block_index}.{type}@{name}'.

set_required_input(required_inputs: dict[str, str]) -> None

Set required inputs for blocks in the workflow.

Parameters:

Name Type Description Default
required_inputs dict[str, str]

Dictionary mapping input keys to values. Keys should match those returned by get_required_input().

required

Raises:

Type Description
KeyError

If a required input is missing.

FileNotFoundError

If a required input file does not exist.

forward(input_path: Path | str, output_path: Path | str = '') -> float | None

Execute the workflow pipeline.

Parameters:

Name Type Description Default
input_path Path | str

Path to the input file for the SourceBlock.

required
output_path Path | str

Path to the output file for the SinkBlock.

''

Returns:

Type Description
float | None

Score value if workflow ends with ScoreBlock, None otherwise.

__call__(input_path: Path | str, output_path: Path | str = '') -> Any

Execute the workflow.

Parameters:

Name Type Description Default
input_path Path | str

Input path.

required
output_path Path | str

Output path (optional).

''

Returns:

Type Description
Any

Output from the final block.

__len__() -> int

Return the number of blocks in the workflow.

__getitem__(index: int) -> BlockBase

Get a block by index.

__repr__() -> str

Generate a visual representation of the workflow.

Parameters

cmxflow.parameter.Parameter(name: str, default: Any)

Bases: ABC

Base class for all parameter types.

Attributes:

Name Type Description
name

Name of the parameter.

Initialize the parameter.

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
default Any

Default value for the parameter.

required

options: Any abstractmethod property

Return the allowed options/range for this parameter.

Returns:

Type Description
Any

For Continuous/Integer: tuple of (low, high) bounds.

Any

For Categorical: list of allowed values.

get() -> Any

Get the current parameter value.

Returns:

Type Description
Any

Current value.

set(value: Any) -> None

Set the parameter value.

Parameters:

Name Type Description Default
value Any

New value to set.

required

Raises:

Type Description
ValueError

If the value fails validation via check().

check(value: Any) -> bool abstractmethod

Check if a value is within the specified constraints.

cmxflow.parameter.Continuous(name: str, default: float, low: float, high: float)

Bases: Parameter

Continuous parameter with float values in a range.

Attributes:

Name Type Description
name

Name of the parameter.

low

Lower bound (inclusive).

high

Upper bound (inclusive).

Initialize a continuous parameter.

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
default float

Default parameter value.

required
low float

Lower bound (inclusive).

required
high float

Upper bound (inclusive).

required

options: tuple[float, float] property

Return the (low, high) range.

Returns:

Type Description
tuple[float, float]

Tuple of (low, high) bounds.

check(value: float) -> bool

Check if a value is within the continuous range.

Parameters:

Name Type Description Default
value float

Value to validate.

required

Returns:

Type Description
bool

True if value is a number within [low, high], False otherwise.

cmxflow.parameter.Integer(name: str, default: int, low: int, high: int)

Bases: Parameter

Integer parameter with values in a range.

Attributes:

Name Type Description
name

Name of the parameter.

low

Lower bound (inclusive).

high

Upper bound (inclusive).

Initialize an integer parameter.

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
default int

Default parameter value.

required
low int

Lower bound (inclusive).

required
high int

Upper bound (inclusive).

required

options: tuple[int, int] property

Return the (low, high) range.

Returns:

Type Description
tuple[int, int]

Tuple of (low, high) bounds.

check(value: int) -> bool

Check if a value is within the integer range.

Parameters:

Name Type Description Default
value int

Value to validate.

required

Returns:

Type Description
bool

True if value is an integer within [low, high], False otherwise.

cmxflow.parameter.Categorical(name: str, default: Any, choices: Sequence[Any])

Bases: Parameter

Categorical parameter with discrete choices.

Attributes:

Name Type Description
name

Name of the parameter.

choices

Sequence of allowed values.

Initialize a categorical parameter.

Parameters:

Name Type Description Default
name str

Name of the parameter.

required
default Any

Default parameter value.

required
choices Sequence[Any]

Sequence of allowed values.

required

options: list[Any] property

Return the list of allowed choices.

Returns:

Type Description
list[Any]

List of allowed values.

check(value: Any) -> bool

Check if a value is one of the allowed choices.

Parameters:

Name Type Description Default
value Any

Value to validate.

required

Returns:

Type Description
bool

True if value is in the choices list, False otherwise.