hadar.workflow package

Submodules

hadar.workflow.pipeline module

class hadar.workflow.pipeline.RestrictedPlug(inputs: List[str] = None, outputs: List[str] = None)

Bases: hadar.workflow.pipeline.Plug

Implementation where stage expect presence of precise columns.

linkable_to(next) → bool

Defined if next stage is linkable with current. In this implementation, plug is linkable only if input of next stage are present in output of current stage.

Parameters

next – other stage to link

Returns

True if current output contain mandatory columns for next input else False

class hadar.workflow.pipeline.FreePlug

Bases: hadar.workflow.pipeline.Plug

Plug implementation when stage can use any kind of DataFrame, whatever columns present inside.

linkable_to(other: hadar.workflow.pipeline.Plug) → bool

Defined if next stage is linkable with current. In this implementation, plug is always linkable

Parameters

other – other stage to link

Returns

True whatever

class hadar.workflow.pipeline.Stage(plug: hadar.workflow.pipeline.Plug)

Bases: abc.ABC

Abstract method which represent an unit of compute. It can be addition with other to create workflow pipeline.

static build_multi_index(scenarios: Union[List[int], numpy.ndarray], names: List[str])

Create column multi index.

Parameters
  • scenarios – list of scenarios serial

  • names – names of data type preset inside each scenario

Returns

multi-index like [(scn, type), …]

static get_names(timeline: pandas.core.frame.DataFrame) → List[str]
static get_scenarios(timeline: pandas.core.frame.DataFrame) → numpy.ndarray
static standardize_column(timeline: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame

Timeline must have first column for scenario and second for data timeline. Add the Oth scenario index if not present.

Parameters

timeline – timeline with or without scenario index

Returns

timeline with scenario index

class hadar.workflow.pipeline.FocusStage(plug)

Bases: hadar.workflow.pipeline.Stage, abc.ABC

Stage focuses on same behaviour for any scenarios.

class hadar.workflow.pipeline.Drop(names: Union[List[str], str])

Bases: hadar.workflow.pipeline.Stage

Drop columns by name.

class hadar.workflow.pipeline.Rename(**kwargs)

Bases: hadar.workflow.pipeline.Stage

Rename column names.

class hadar.workflow.pipeline.Fault(loss: float, occur_freq: float, downtime_min: int, downtime_max, seed: int = None)

Bases: hadar.workflow.pipeline.FocusStage

Generate a random fault for each scenarios.

class hadar.workflow.pipeline.RepeatScenario(n)

Bases: hadar.workflow.pipeline.Stage

Repeat n-time current scenarios.

class hadar.workflow.pipeline.ToShuffler(result_name: str)

Bases: hadar.workflow.pipeline.Rename

To Connect pipeline to shuffler

class hadar.workflow.pipeline.Pipeline(stages: List[T])

Bases: object

Compute many stages sequentially.

assert_computable(timeline: pandas.core.frame.DataFrame)

Verify timeline is computable by pipeline.

Parameters

timeline – timeline to check

Returns

True if computable False else

assert_to_shuffler()
class hadar.workflow.pipeline.Clip(lower: float = None, upper: float = None)

Bases: hadar.workflow.pipeline.Stage

Cut data according to upper and lower boundaries. Same as np.clip function.

hadar.workflow.shuffler module

class hadar.workflow.shuffler.Shuffler(sampler=<built-in method randint of numpy.random.mtrand.RandomState object>)

Bases: object

Receive all data sources like raw matrix or pipeline. Schedule pipeline generation and shuffle all timeline to create scenarios.

add_data(name: str, data: numpy.ndarray)

Add raw data by numpy array. If you generate data by pipeline use add_pipeline. It will parallelize computation and manage swap. :param name: timeline name :param data: numpy array with shape as (scenario, horizon) :return: self

add_pipeline(name: str, data: pandas.core.frame.DataFrame, pipeline: hadar.workflow.pipeline.Pipeline)

Add data by pipeline and input data for pipeline.

Parameters
  • name – timeline name

  • data – data to use as pipeline input

  • pipeline – pipeline to generate data

Returns

self

shuffle(nb_scn)

Start pipeline generation and shuffle result to create scenario sampling.

Parameters

nb_scn – number of scenarios to sample

Returns

class hadar.workflow.shuffler.Timeline(data: numpy.ndarray = None, sampler=<built-in method randint of numpy.random.mtrand.RandomState object>)

Bases: object

Manage data used to generate timeline. Perform sampling too.

compute() → numpy.ndarray

Compute method called before sampling. For Timeline method just return data.

Returns

return data given in constructor

sample(nb) → numpy.ndarray

Perform sampling. Compute data is needed before.

Parameters

nb – number of sampling

Returns

scenario matrix shape like (nb, horizon)

Module contents