Class IterativeOperator

  • All Implemented Interfaces:
    LogicalOperator
    Direct Known Subclasses:
    DecisionTreeLearner, KMeans, LinearRegressionLearner

    public abstract class IterativeOperator
    extends AbstractLogicalOperator
    To be implemented by operations that must make multiple passes over the input data. The framework will ensure that the inputs are staged such that the operation can iterate on them. The lifecycle for iterators is as-follows:
    1. computeMetadata is called once during graph compilation. This gives operators a chance to validate and declare parallelizability, input metadata, and output metadata.
    2. createIterator is called once during graph execution.
    3. execute is invoked once during graph execution to perform the "body" of the iteration.
    4. finalComposition is invoked once during graph execution to give the operator a chance to output final results.
    • Constructor Detail

      • IterativeOperator

        public IterativeOperator()
    • Method Detail

      • computeMetadata

        protected abstract void computeMetadata​(IterativeMetadataContext ctx)
        Implementations must adhere to the following contracts

        General

        Regardless of input ports/output port types, all implementations must do the following:

        1. Validation. Validation of configuration should always be performed first.
        2. Declare operator parallelizability. Implementations must declare by calling IterativeMetadataContext.parallelize(ParallelismStrategy).
        3. Declare output port parallelizablility. Implementations must declare by calling IterativeMetadataContext.setOutputParallelizable(com.pervasive.datarush.ports.LogicalPort, boolean)
        4. Declare input port parallelizablility. Implementations must declare by calling IterativeMetadataContext.setIterationParallelizable(com.pervasive.datarush.ports.LogicalPort, boolean).
        NOTE: There is a convenience method for performing steps 2-4 for the case where all record ports are parallelizable and where we are to determine parallelism based on source:
        • MetadataUtil#negotiateParallelismBasedOnSourceAssumingParallelizableRecords

        Input record ports

        Implementations with input record ports must declare the following:
        1. Required data ordering:
        2. Implementations that have data ordering requirements must declare them by calling RecordPort#setRequiredDataOrdering, otherwise iteration will proceed on an input dataset whose order is undefined.
        3. Required data distribution (only applies to parallelizable input ports):
        4. Implementations that have data distribution requirements must declare them by calling RecordPort#setRequiredDataDistribution, otherwise iteration will proceed on an input dataset whose distribution is the unspecified partial distribution.
        Note that if the upstream operator's output distribution/ordering is compatible with those required, we avoid a re-sort/re-distribution which is generally a very large savings from a performance standpoint.

        Output record ports (static metadata)

        Implementations with output record ports must declare the following:
        1. Type: Implementations must declare their output type by calling RecordPort#setType.
        Implementations with output record ports may declare the following:
        1. Output data ordering: Implementations that can make guarantees as to their output ordering may do so by calling RecordPort#setOutputDataOrdering
        2. Output data distribution (only applies to parallelizable output ports): Implementations that can make guarantees as to their output distribution may do so by calling RecordPort#setOutputDataDistribution
        Note that both of these properties are optional; if unspecified, performance may suffer since the framework may unnecessarily re-sort/re-distributed the data.

        Input model ports

        In general, iterative operators will tend not to have model input ports, but if so, there is nothing special to declare for input model ports. Models are implicitly duplicated to all partitions when going from non-parallel to parallel ports.

        Output model ports (static metadata)

        SimpleModelPort's have no associated metadata and therefore there is never any output metadata to declare. PMMLPort's, on the other hand, do have associated metadata. For all PMMLPorts, implementations must declare the following:
        1. pmmlModelSpec: Implementations must declare the PMML model spec by calling PMMLPort.setPMMLModelSpec.

        Output ports with dynamic metadata

        If an output port has dynamic metadata, implementations can declare by calling IterativeMetadataContext.setOutputMetadataDynamic(com.pervasive.datarush.ports.LogicalPort, boolean). In the case that metadata is dynamic, calls to RecordPort#setType, RecordPort#setOutputDataOrdering, etc are not allowed and thus the sections above entitled "Output record ports (static metadata)" and "Output model ports (static metadata)" must be skipped. Note that, if possible, dynamic metadata should be avoided (see IterativeMetadataContext.setOutputMetadataDynamic(com.pervasive.datarush.ports.LogicalPort, boolean)).

        Parameters:
        ctx - the context
      • createIterator

        protected abstract CompositionIterator createIterator​(MetadataContext ctx)
        Invoked at the start of execution. The iterator is expected to return a handle that is then used for execution.
        Parameters:
        ctx - a context in which the iterative operator can find input port metadata, etc. this information was available in the previous call to computeMetadata(IterativeMetadataContext), but is available here as well so that the iterative operator need not cache any metadata in its instance variables.
        Returns:
        a handle that is used for iteration