hadar.workflow package


hadar.workflow.pipeline module

class hadar.workflow.pipeline.RestrictedPlug(inputs: List[str] = None, outputs: List[str] = None)

Bases: hadar.workflow.pipeline.Plug

Implementation where stage expect presence of precise columns.

linkable_to(next) → bool

Defined if next stage is linkable with current. In this implementation, plug is linkable only if input of next stage are present in output of current stage.


next – other stage to link


True if current output contain mandatory columns for next input else False

class hadar.workflow.pipeline.FreePlug

Bases: hadar.workflow.pipeline.Plug

Plug implementation when stage can use any kind of DataFrame, whatever columns present inside.

linkable_to(other: hadar.workflow.pipeline.Plug) → bool

Defined if next stage is linkable with current. In this implementation, plug is always linkable


other – other stage to link


True whatever

class hadar.workflow.pipeline.Stage(plug: hadar.workflow.pipeline.Plug)

Bases: abc.ABC

Abstract method which represent an unit of compute. It can be addition with other to create workflow pipeline.

static build_multi_index(scenarios: Union[List[int], numpy.ndarray], names: List[str])

Create column multi index.

  • scenarios – list of scenarios serial

  • names – names of data type preset inside each scenario


multi-index like [(scn, type), …]

static get_names(timeline: pandas.core.frame.DataFrame) → List[str]
static get_scenarios(timeline: pandas.core.frame.DataFrame) → numpy.ndarray
static standardize_column(timeline: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame

Timeline must have first column for scenario and second for data timeline. Add the Oth scenario index if not present.


timeline – timeline with or without scenario index


timeline with scenario index

class hadar.workflow.pipeline.FocusStage(plug)

Bases: hadar.workflow.pipeline.Stage, abc.ABC

Stage focuses on same behaviour for any scenarios.

class hadar.workflow.pipeline.Drop(names: Union[List[str], str])

Bases: hadar.workflow.pipeline.Stage

Drop columns by name.

class hadar.workflow.pipeline.Rename(**kwargs)

Bases: hadar.workflow.pipeline.Stage

Rename column names.

class hadar.workflow.pipeline.Fault(loss: float, occur_freq: float, downtime_min: int, downtime_max, seed: int = None)

Bases: hadar.workflow.pipeline.FocusStage

Generate a random fault for each scenarios.

class hadar.workflow.pipeline.RepeatScenario(n)

Bases: hadar.workflow.pipeline.Stage

Repeat n-time current scenarios.

class hadar.workflow.pipeline.ToShuffler(result_name: str)

Bases: hadar.workflow.pipeline.Rename

To Connect pipeline to shuffler

class hadar.workflow.pipeline.Pipeline(stages: List[T])

Bases: object

Compute many stages sequentially.

assert_computable(timeline: pandas.core.frame.DataFrame)

Verify timeline is computable by pipeline.


timeline – timeline to check


True if computable False else

class hadar.workflow.pipeline.Clip(lower: float = None, upper: float = None)

Bases: hadar.workflow.pipeline.Stage

Cut data according to upper and lower boundaries. Same as np.clip function.

hadar.workflow.shuffler module

class hadar.workflow.shuffler.Shuffler(sampler=<built-in method randint of numpy.random.mtrand.RandomState object>)

Bases: object

Receive all data sources like raw matrix or pipeline. Schedule pipeline generation and shuffle all timeline to create scenarios.

add_data(name: str, data: numpy.ndarray)

Add raw data by numpy array. If you generate data by pipeline use add_pipeline. It will parallelize computation and manage swap. :param name: timeline name :param data: numpy array with shape as (scenario, horizon) :return: self

add_pipeline(name: str, data: pandas.core.frame.DataFrame, pipeline: hadar.workflow.pipeline.Pipeline)

Add data by pipeline and input data for pipeline.

  • name – timeline name

  • data – data to use as pipeline input

  • pipeline – pipeline to generate data




Start pipeline generation and shuffle result to create scenario sampling.


nb_scn – number of scenarios to sample


class hadar.workflow.shuffler.Timeline(data: numpy.ndarray = None, sampler=<built-in method randint of numpy.random.mtrand.RandomState object>)

Bases: object

Manage data used to generate timeline. Perform sampling too.

compute() → numpy.ndarray

Compute method called before sampling. For Timeline method just return data.


return data given in constructor

sample(nb) → numpy.ndarray

Perform sampling. Compute data is needed before.


nb – number of sampling


scenario matrix shape like (nb, horizon)

Module contents