public abstract class ExecutableOperator extends StreamingOperator
execute(ExecutionContext) which generally consists of reading data from
 the input ports and writing data to the output ports.
 The general life-cycle is as-follows:
 computeMetadata Is invoked to compute metadata. This is always
    performed locally and is done up-front prior to execution so this is the appropriate time to perform
    any necessary validation.cloneForExecution Is performed to clone this object. This is performed once for each
    thread of execution. This guarantees that if the operator modifies any internal state as
    part of its execution that those changes not be visible to other threads. Thus, we provide
    a safeguard to guaranty thread-safetyexecute Is invoked on each copy that was returned by calling cloneForExecution() in order to execute.DeriveFields.
 
   public class MyExecutableOp extends ExecutableOperator {
    
       //declare input port
       private final RecordPort input= newRecordInput("input");
       //declare output port
       private final RecordPort output= newRecordOutput("output");
   
       //configuration: name of the first input field
       private String inputFieldName1= "field1";
       //configuration: name of the second input field
       private String inputFieldName2= "field2";
       //configuration: name of the output field
       private String sumFieldName= "sum";
   
       public MyExecutableOp() {
       
       }
       
       ...(getters and setters for input, output, inputFieldName1, inputFieldName2, sumFieldName)
       private void validateInput(StreamingMetadataContext ctx) {
           ... ( make sure input fields are of type double, etc )
       }
        @Override
       protected void computeMetadata(StreamingMetadataContext ctx) {
       
           validateInput(ctx);
               
           //we are parallelizable
           ctx.setParallelizable(true);
               
           //declare output type to be a record of the format [inputFieldName1:double, inputFieldName2:double, sumFieldName:double]
           RecordTokenType outputType=
               record( DOUBLE(inputFieldName1),
                       DOUBLE(inputFieldName2),                    
                       DOUBLE(sumFieldName));        
           output.setType(ctx, outputType);                
       }
   
        @Override
       protected void execute(ExecutionContext ctx) {
           //get handles to input and output
           RecordInput input= this.input.getInput(ctx);
           RecordOutput output= this.output.getOutput(ctx);
       
           //get handles to input fields
           DoubleValued inputField1 = (DoubleValued)input.getField(this.inputFieldName1);
           DoubleValued inputField2 = (DoubleValued)input.getField(this.inputFieldName2);
       
           //get handle to output fields
           DoubleSettable outputField1 = (DoubleSettable)output.getField(this.inputFieldName1);
           DoubleSettable outputField2 = (DoubleSettable)output.getField(this.inputFieldName2);
           DoubleSettable sumField = (DoubleSettable)output.getField(this.sumFieldName);
       
           //step until input is drained
           while ( input.stepNext() ) {
               double val1= inputField1.asDouble();
               double val2= inputField2.asDouble();
               //copy the input fields into two corresponding output fields
               outputField1.set(val1);
               outputField2.set(val2);
               //set the sum field equal to the sum of the inputs
               sumField.set(val1+val2);
           
               //push a row of data
               output.push();
           }
       
           //signal end-of-data on output
           output.pushEndOfData();
       } 
   }
 | Constructor and Description | 
|---|
ExecutableOperator()  | 
| Modifier and Type | Method and Description | 
|---|---|
protected ExecutableOperator | 
cloneForExecution()
Performs a deep copy of the operator for execution. 
 | 
protected abstract void | 
execute(ExecutionContext ctx)
Executes the operator. 
 | 
protected int | 
getNumInputCopies(LogicalPort inputPort)
May be overridden to specify that multiple input copies are needed for a given
 input port. 
 | 
protected PortSetting[] | 
getPortSettings(LogicalPort outputPort)
May be overridden to specify port settings for a given output port 
 | 
protected boolean | 
handleInactiveOutput(LogicalPort output)
Called when one of our outputs is no longer being read, to perform any cleanup necessary. 
 | 
computeMetadatadisableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyErrorprotected PortSetting[] getPortSettings(LogicalPort outputPort)
outputPort - the output portprotected int getNumInputCopies(LogicalPort inputPort)
inputPort - the portprotected boolean handleInactiveOutput(LogicalPort output)
output - the output that has just gone inactiveprotected ExecutableOperator cloneForExecution()
protected abstract void execute(ExecutionContext ctx)
ctx - context in which to lookup physical ports bound to logical portsCopyright © 2020 Actian Corporation. All rights reserved.