- java.lang.Object
-
- com.pervasive.datarush.operators.AbstractLogicalOperator
-
- com.pervasive.datarush.operators.StreamingOperator
-
- com.pervasive.datarush.operators.ExecutableOperator
-
- All Implemented Interfaces:
LogicalOperator
- Direct Known Subclasses:
AbstractExecutableRecordPipeline,AbstractPredictor,AbstractWriteToJDBC.AbstractWriteToJDBCWorker,AssertEqual,AssertMetadata,AssertPredicate,AssertSorted,CalculateNGramFrequency,CalculateWordFrequency,CollectRecords,ConvertARMModel,ConvertTextCase,CountTokens,CrossJoin,DecisionTreePruner,DictionaryFilter,EmitRecords,ExpandTextFrequency,ExpandTextTokens,ExternalRecordSink,ExternalRecordSource,FilterExistJoinProcess,FilterText,FinalizeSQLWorker,GenerateArithmeticSequence,GenerateBagOfWords,GenerateConstant,GenerateRandom,GenerateRepeatingCycle,GetModel,GroupPairsSortedRows,InitializeSQLWorker,LargeGroupDetector,LogRows,MergeFields,MockableExternalRecordSink,MockableExternalRecordSource,ProcessByGroup,PutModel,RemapFields,RunScript,SortedGroupHandler,SumOfSquares,SVMLearner,TextFrequencyFilter,TextStemmer,TextTokenizer,WritePMML,WriteSink
public abstract class ExecutableOperator extends StreamingOperator
ExecutableOperators are the most commonly used operators. Work is performed in theexecute(ExecutionContext)which generally consists of reading data from the input ports and writing data to the output ports. The general life-cycle is as-follows:computeMetadataIs invoked to compute metadata. This is always performed locally and is done up-front prior to execution so this is the appropriate time to perform any necessary validation.cloneForExecutionIs performed to clone this object. This is performed once for each thread of execution. This guarantees that if the operator modifies any internal state as part of its execution that those changes not be visible to other threads. Thus, we provide a safeguard to guaranty thread-safetyexecuteIs invoked on each copy that was returned by callingcloneForExecution()in order to execute.
DeriveFields.public class MyExecutableOp extends ExecutableOperator { //declare input port private finalRecordPortinput=newRecordInput("input"); //declare output port private finalRecordPortoutput=newRecordOutput("output"); //configuration: name of the first input field private String inputFieldName1= "field1"; //configuration: name of the second input field private String inputFieldName2= "field2"; //configuration: name of the output field private String sumFieldName= "sum"; public MyExecutableOp() { } ...(getters and setters for input, output, inputFieldName1, inputFieldName2, sumFieldName) private void validateInput(StreamingMetadataContextctx) { ... ( make sure input fields are of type double, etc ) }@Overrideprotected void computeMetadata(StreamingMetadataContextctx) { validateInput(ctx); //we are parallelizable ctx.setParallelizable(true); //declare output type to be a record of the format [inputFieldName1:double, inputFieldName2:double, sumFieldName:double]RecordTokenTypeoutputType= record( DOUBLE(inputFieldName1), DOUBLE(inputFieldName2), DOUBLE(sumFieldName)); output.setType(ctx, outputType); }@Overrideprotected void execute(ExecutionContextctx) { //get handles to input and outputRecordInputinput= this.input.getInput(ctx);RecordOutputoutput= this.output.getOutput(ctx); //get handles to input fieldsDoubleValuedinputField1 = (DoubleValued)input.getField(this.inputFieldName1);DoubleValuedinputField2 = (DoubleValued)input.getField(this.inputFieldName2); //get handle to output fieldsDoubleSettableoutputField1 = (DoubleSettable)output.getField(this.inputFieldName1);DoubleSettableoutputField2 = (DoubleSettable)output.getField(this.inputFieldName2);DoubleSettablesumField = (DoubleSettable)output.getField(this.sumFieldName); //step until input is drained while ( input.stepNext() ) { double val1= inputField1.asDouble(); double val2= inputField2.asDouble(); //copy the input fields into two corresponding output fields outputField1.set(val1); outputField2.set(val2); //set the sum field equal to the sum of the inputs sumField.set(val1+val2); //push a row of data output.push(); } //signal end-of-data on output output.pushEndOfData(); } }
-
-
Constructor Summary
Constructors Constructor Description ExecutableOperator()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected ExecutableOperatorcloneForExecution()Performs a deep copy of the operator for execution.protected abstract voidexecute(ExecutionContext ctx)Executes the operator.protected intgetNumInputCopies(LogicalPort inputPort)May be overridden to specify that multiple input copies are needed for a given input port.protected PortSetting[]getPortSettings(LogicalPort outputPort)May be overridden to specify port settings for a given output portprotected booleanhandleInactiveOutput(LogicalPort output)Called when one of our outputs is no longer being read, to perform any cleanup necessary.-
Methods inherited from class com.pervasive.datarush.operators.StreamingOperator
computeMetadata
-
Methods inherited from class com.pervasive.datarush.operators.AbstractLogicalOperator
disableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyError
-
-
-
-
Method Detail
-
getPortSettings
protected PortSetting[] getPortSettings(LogicalPort outputPort)
May be overridden to specify port settings for a given output port- Parameters:
outputPort- the output port- Returns:
- the port settings to use. By default this is an empty array
-
getNumInputCopies
protected int getNumInputCopies(LogicalPort inputPort)
May be overridden to specify that multiple input copies are needed for a given input port. By default this is one. This can be used in rare cases when we must examine multiple positions in the same input stream.- Parameters:
inputPort- the port- Returns:
- the number of input copies for the port
-
handleInactiveOutput
protected boolean handleInactiveOutput(LogicalPort output)
Called when one of our outputs is no longer being read, to perform any cleanup necessary. The default implementation is a no-op; subclasses may wish to override it.- Parameters:
output- the output that has just gone inactive- Returns:
- false if the operator should terminate when there are no more outputs.
-
cloneForExecution
protected ExecutableOperator cloneForExecution()
Performs a deep copy of the operator for execution. The default implementation is implemented in terms of JSON serialization: we perform a JSON serialization followed by a JSON deserialization. As a best-practice, operator implementations should not override this method. If they must override, though, then they must guarantee that cloneForExecution copies any instance variables that are modified by execute.- Returns:
- a deep copy of this operator
-
execute
protected abstract void execute(ExecutionContext ctx)
Executes the operator. Implementations should adhere to the following contracts:- Following execution, all input ports must be at end-of-data.
- Following execution, all output ports must be at end-of-data.
- Parameters:
ctx- context in which to lookup physical ports bound to logical ports
-
-