Core¶
Base classes, workflow orchestration, and parameter types.
Block Base Classes¶
cmxflow.block.BlockBase(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)
¶
Bases: ABC
Base class with shared functionality for all block types.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Human-readable name for the block. |
|
params |
dict[str, Continuous | Categorical | Integer]
|
Dictionary of mutable parameters. |
input_files |
dict[str, Path]
|
List of required input file names. |
Initialize the block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Optional name for the block. Defaults to class name. |
None
|
input_files
|
list[str] | None
|
Optional files that will be surfaced as required input at run time. |
None
|
input_text
|
list[str] | None
|
Optional text that will be surfaces as required input at run time. |
None
|
set_inputs(**config: str) -> None
¶
Set inputs if matching required files or text.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**config
|
str
|
Keyword arguments mapping input names to values. For file inputs, values should be file paths. For text inputs, values should be strings. For params, values should match the parameter type. |
{}
|
get_params() -> dict[str, Any]
¶
Get all mutable parameters for this block.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary mapping parameter names to Parameter objects. |
get_param(key: str) -> Any
¶
Get the current value of a mutable parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Name of the parameter to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The current value of the parameter. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the parameter name is not registered. |
mutable(*parameters: Continuous | Integer | Categorical) -> None
¶
Register parameters as mutable for optimization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*parameters
|
Continuous | Integer | Categorical
|
Parameter objects to register as mutable. |
()
|
reset_cache() -> None
¶
Reset any cached state for a new optimization iteration.
Called at the start of each optimization trial to ensure blocks don't retain stale cached data. Override in subclasses that maintain internal caches.
cmxflow.block.Block(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)
¶
Bases: BlockBase
Block that transforms items from an iterator.
Subclasses must implement forward to define the transformation.
forward(arg: Any) -> Any
abstractmethod
¶
Define a single unit of work.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arg
|
Any
|
Input item to transform. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Transformed item. |
__call__(iter: Iterator[Any]) -> Iterator[Any]
¶
Execute the block on an iterator of items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iter
|
Iterator[Any]
|
Iterator of input items to process. |
required |
Yields:
| Type | Description |
|---|---|
Any
|
Transformed items that pass input and output checks. |
check_input(arg: Any) -> bool
¶
Validate an input item before processing.
Override this method to filter out invalid inputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arg
|
Any
|
Input item to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the item should be processed, False to skip. |
check_output(arg: Any) -> bool
¶
Validate an output item before yielding.
Override this method to filter out invalid outputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
arg
|
Any
|
Output item to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the item should be yielded, False to discard. |
cmxflow.block.SourceBlock(reader: Callable[[Path], Iterator[Any]])
¶
Bases: BlockBase
Block that produces items from a source file.
Initialize a source block with a reader function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reader
|
Callable[[Path], Iterator[Any]]
|
Callable that takes a Path and yields items. |
required |
forward(path: Path) -> Iterator[Any]
¶
Read items from the source file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
Path to the source file. |
required |
Yields:
| Type | Description |
|---|---|
Any
|
Items read from the source file. |
__call__(path: Path) -> Iterator[Any]
¶
Execute the source block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
Path
|
Path to the source file. |
required |
Returns:
| Type | Description |
|---|---|
Iterator[Any]
|
Iterator of items from the source file. |
cmxflow.block.SinkBlock(writer: Callable[[Iterator[Any], Path], None])
¶
Bases: BlockBase
Block that terminates a workflow by writing to a file.
Initialize a sink block with a writer function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
writer
|
Callable[[Iterator[Any], Path], None]
|
Callable that takes an iterator and Path to write items. |
required |
forward(iter: Iterator[Any], path: Path) -> None
¶
Write items to the destination file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iter
|
Iterator[Any]
|
Iterator of items to write. |
required |
path
|
Path
|
Path to the destination file. |
required |
__call__(iter: Iterator[Any], path: Path) -> None
¶
Execute the sink block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iter
|
Iterator[Any]
|
Iterator of items to write. |
required |
path
|
Path
|
Path to the destination file. |
required |
cmxflow.block.ScoreBlock(name: str | None = None, input_files: list[str] | None = None, input_text: list[str] | None = None)
¶
Bases: BlockBase
Block with state-dependent behavior for optimization vs normal execution.
During optimization (when uid is provided), computes a score via objective(). During normal execution, passes items through via forward().
Subclasses must implement objective() for optimization scoring. Optionally override forward() to transform items during normal execution.
Initialize the score block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Optional name for the block. |
None
|
input_files
|
list[str] | None
|
Optional files required at run time. |
None
|
input_text
|
list[str] | None
|
Optional text required at run time. |
None
|
objective(iter: Iterator[Any]) -> float
abstractmethod
¶
Compute the optimization objective score.
Called only during optimization. Must be implemented by subclasses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iter
|
Iterator[Any]
|
Iterator of items to score. |
required |
Returns:
| Type | Description |
|---|---|
float
|
Score value (higher is better for maximize, lower for minimize). |
forward(item: Any) -> Any
¶
Transform a single item during normal (non-optimization) execution.
Default implementation passes items through unchanged. Override to filter or transform items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any
|
Input item. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Transformed item, or None to filter it out. |
__call__(iter: Iterator[Any], uid: tuple[str, ...] | None = None) -> float | Iterator[Any]
¶
Execute the score block.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
iter
|
Iterator[Any]
|
Iterator of input items. |
required |
uid
|
tuple[str, ...] | None
|
Unique identifier for caching (present during optimization). |
None
|
Returns:
| Type | Description |
|---|---|
float | Iterator[Any]
|
If uid is provided (optimization mode): float score. |
float | Iterator[Any]
|
If uid is None (normal mode): Iterator of transformed items. |
Workflow¶
cmxflow.workflow.Workflow(name: str = 'Workflow')
¶
A workflow that chains multiple blocks together.
Workflows allow dynamic composition of blocks into executable pipelines. Blocks are executed sequentially, with the output of each block passed as input to the next.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Human-readable name for the workflow. |
|
blocks |
list[BlockBase]
|
List of blocks in execution order. |
Initialize an empty workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Optional name for the workflow. |
'Workflow'
|
reset_cache() -> None
¶
Reset cached state in all blocks for a new optimization iteration.
add(*blocks: Any) -> Workflow
¶
Add one or more blocks to the end of the workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*blocks
|
Any
|
The block(s) to add. |
()
|
Returns:
| Type | Description |
|---|---|
Workflow
|
Self for method chaining. |
insert(index: int, block: BlockBase) -> Workflow
¶
clear() -> Workflow
¶
get_params() -> list[Parameter]
¶
Get all mutable parameters from all blocks.
Returns:
| Type | Description |
|---|---|
list[Parameter]
|
List of parameter objects linked to blocks. |
check() -> None
¶
Validate the workflow structure.
Ensures the workflow has a SourceBlock first, SinkBlock last, and only Block instances in between.
Raises:
| Type | Description |
|---|---|
IndexError
|
If fewer than 2 blocks are present. |
ValueError
|
If block types are in incorrect positions. |
get_required_input() -> dict[str, type]
¶
Get all required inputs from blocks in the workflow.
Returns:
| Type | Description |
|---|---|
dict[str, type]
|
Dictionary mapping input keys to their expected types. |
dict[str, type]
|
Keys are formatted as '{block_index}.{type}@{name}'. |
set_required_input(required_inputs: dict[str, str]) -> None
¶
Set required inputs for blocks in the workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
required_inputs
|
dict[str, str]
|
Dictionary mapping input keys to values. Keys should match those returned by get_required_input(). |
required |
Raises:
| Type | Description |
|---|---|
KeyError
|
If a required input is missing. |
FileNotFoundError
|
If a required input file does not exist. |
forward(input_path: Path | str, output_path: Path | str = '') -> float | None
¶
Execute the workflow pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_path
|
Path | str
|
Path to the input file for the SourceBlock. |
required |
output_path
|
Path | str
|
Path to the output file for the SinkBlock. |
''
|
Returns:
| Type | Description |
|---|---|
float | None
|
Score value if workflow ends with ScoreBlock, None otherwise. |
__call__(input_path: Path | str, output_path: Path | str = '') -> Any
¶
Execute the workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_path
|
Path | str
|
Input path. |
required |
output_path
|
Path | str
|
Output path (optional). |
''
|
Returns:
| Type | Description |
|---|---|
Any
|
Output from the final block. |
__len__() -> int
¶
Return the number of blocks in the workflow.
__getitem__(index: int) -> BlockBase
¶
Get a block by index.
__repr__() -> str
¶
Generate a visual representation of the workflow.
Parameters¶
cmxflow.parameter.Parameter(name: str, default: Any)
¶
Bases: ABC
Base class for all parameter types.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Name of the parameter. |
Initialize the parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the parameter. |
required |
default
|
Any
|
Default value for the parameter. |
required |
options: Any
abstractmethod
property
¶
Return the allowed options/range for this parameter.
Returns:
| Type | Description |
|---|---|
Any
|
For Continuous/Integer: tuple of (low, high) bounds. |
Any
|
For Categorical: list of allowed values. |
get() -> Any
¶
Get the current parameter value.
Returns:
| Type | Description |
|---|---|
Any
|
Current value. |
set(value: Any) -> None
¶
Set the parameter value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
New value to set. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the value fails validation via check(). |
check(value: Any) -> bool
abstractmethod
¶
Check if a value is within the specified constraints.
cmxflow.parameter.Continuous(name: str, default: float, low: float, high: float)
¶
Bases: Parameter
Continuous parameter with float values in a range.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Name of the parameter. |
|
low |
Lower bound (inclusive). |
|
high |
Upper bound (inclusive). |
Initialize a continuous parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the parameter. |
required |
default
|
float
|
Default parameter value. |
required |
low
|
float
|
Lower bound (inclusive). |
required |
high
|
float
|
Upper bound (inclusive). |
required |
options: tuple[float, float]
property
¶
Return the (low, high) range.
Returns:
| Type | Description |
|---|---|
tuple[float, float]
|
Tuple of (low, high) bounds. |
check(value: float) -> bool
¶
Check if a value is within the continuous range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
Value to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if value is a number within [low, high], False otherwise. |
cmxflow.parameter.Integer(name: str, default: int, low: int, high: int)
¶
Bases: Parameter
Integer parameter with values in a range.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Name of the parameter. |
|
low |
Lower bound (inclusive). |
|
high |
Upper bound (inclusive). |
Initialize an integer parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the parameter. |
required |
default
|
int
|
Default parameter value. |
required |
low
|
int
|
Lower bound (inclusive). |
required |
high
|
int
|
Upper bound (inclusive). |
required |
options: tuple[int, int]
property
¶
Return the (low, high) range.
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
Tuple of (low, high) bounds. |
check(value: int) -> bool
¶
Check if a value is within the integer range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
int
|
Value to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if value is an integer within [low, high], False otherwise. |
cmxflow.parameter.Categorical(name: str, default: Any, choices: Sequence[Any])
¶
Bases: Parameter
Categorical parameter with discrete choices.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Name of the parameter. |
|
choices |
Sequence of allowed values. |
Initialize a categorical parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name of the parameter. |
required |
default
|
Any
|
Default parameter value. |
required |
choices
|
Sequence[Any]
|
Sequence of allowed values. |
required |
options: list[Any]
property
¶
Return the list of allowed choices.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of allowed values. |
check(value: Any) -> bool
¶
Check if a value is one of the allowed choices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
Value to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if value is in the choices list, False otherwise. |