All Implemented Interfaces:
LogicalOperator, PipelineOperator<RecordPort>, RecordPipelineOperator

public final class Sort extends AbstractDeferredRecordOperator
Sorts the input data. The sorted data is available on the output port, obtained from getOutput().

Sort ordering is configurable via the property sortKeys. A set of key fields must be provided, optionally including the ordering direction for each key. The specified fields must exist in the input data. If ordering information is omitted, ascending order is assumed. null values sort lower than non-null values under ascending order, higher under descending order.

Additional parameters may be set on the sort, permitting fine tuning of implementation specific details. By default, a reasonable default set of parameter values will be used which is suitable for the majority of uses.

Parallelism

The sort operator itself performs a partial sort. The sort is performed in parallel across multiple partitions. Each partition is sorted independently. If the source data is already partitioned, the existing partitions will be preserved and sorted. If the source data is not partitioned, the data will be partitioned in a round-robin fashion. Any operations that require a gather of input data will preserve sort order. Thus, following a gather, the data will represent a total ordering. Therefore, any data sinks that write the output to a single file will produce a total ordering.
See Also:
  • Field Details

    • SORT_BUFFER_SIZE_MIN

      public static final long SORT_BUFFER_SIZE_MIN
      The smallest allowable sort buffer, 128K.
      See Also:
    • DEFAULT_SORT_BUFFER_SIZE

      public static final long DEFAULT_SORT_BUFFER_SIZE
      The default sort buffer, 100M.
      See Also:
    • SORT_BUFFER_SIZE_MAX

      public static final long SORT_BUFFER_SIZE_MAX
      The largest allowable sort buffer, 16G.
      See Also:
    • DEFAULT_IO_BUFFER_SIZE

      public static final int DEFAULT_IO_BUFFER_SIZE
      File I/O buffers default to 64K.
      See Also:
  • Constructor Details

    • Sort

      public Sort()
      The default constructor. Prior to graph compilation the following required properties must be specified or an exception will be raised:
    • Sort

      public Sort(List<SortKey> keys)
      Creates a new instance of Sort, specifying the minimal set of required parameters.
      Parameters:
      keys - The fields by which to sort.
    • Sort

      public Sort(SortKey... keys)
      Creates a new instance of Sort, specifying the minimal set of required parameters.
      Parameters:
      keys - The fields by which to sort.
    • Sort

      public Sort(String... keys)
      Creates a new instance of Sort, specifying the minimal set of required parameters.
      Parameters:
      keys - The fields by which to sort. Ascending order is assumed.
  • Method Details

    • getInput

      public RecordPort getInput()
      Description copied from interface: PipelineOperator
      Returns the input port
      Specified by:
      getInput in interface PipelineOperator<RecordPort>
      Overrides:
      getInput in class AbstractDeferredRecordOperator
      Returns:
      the input port
    • getOutput

      public RecordPort getOutput()
      Description copied from interface: PipelineOperator
      Returns the output port
      Specified by:
      getOutput in interface PipelineOperator<RecordPort>
      Overrides:
      getOutput in class AbstractDeferredRecordOperator
      Returns:
      the output port
    • setMaxMerge

      public void setMaxMerge(int maxMerge)
      Sets the maximum number of run files to merge at one time. Small values will reduce memory usage during the merge phase, but can force multiple merge passes over the runs. Large values decrease the number of passes, but at a cost of memory; each run being merged will require a buffer of getIOBufferSize() bytes.

      If not set or set to 0, this first defaults to EngineConfig.Sort.getMaxMerge(). If that is also unspecified or set to 0 this defaults to getSortBufferSize()/getIOBufferSize().

      Parameters:
      maxMerge - Maximum number of files to merge
    • getMaxMerge

      public int getMaxMerge()
      Gets the maximum number of intermediate result files which should be merged at one time. A value of 0 indicates that the cap will be derived from the sort and I/O buffer sizes as described in setMaxMerge(int).
      Returns:
      Maximum number of files to merge
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • setSortBuffer

      public void setSortBuffer(String sizeSpecifier)
      Sets an approximate cap on the amount of memory used by the sort. If the data is unable to fit into memory, it will be written to intermediate storage as required.

      Values are supplied as number strings, supporting an optional size suffix. The common suffixes K, M, and G are supported, having the expected meaning; suffixes are case-insensitive. Omitting the suffix indicates the value is in bytes. Values are limited to the range from 1M to 2G. Values outside of this range will be adjusted to the nearest limit.

      If not specified or value is 0, this first defaults to EngineConfig.Sort.getSortBufferSize(). If that is also unspecified or value is 0, this defaults to DEFAULT_SORT_BUFFER_SIZE.

      Parameters:
      sizeSpecifier - a string specifying a byte size. Sizes are specified as positive whole numbers with an optional case-insensitive multiplier suffix.
      Throws:
      com.pervasive.datarush.graphs.physical.InvalidPropertyValueException - if the size specifier cannot be parsed or specifies a negative size.
    • setSortBufferSize

      public void setSortBufferSize(long size)
      Sets the size of the memory usage target, in bytes, for a sort.
      Parameters:
      size - the approximate memory usage cap for sorting
    • getSortBufferSize

      public long getSortBufferSize()
      Gets the size of the memory usage target, in bytes, for a sort.
      Returns:
      the approximate memory usage cap for sorting
    • setIOBuffer

      public void setIOBuffer(String sizeSpecifier)
      Sets the size of the memory buffers used to for I/O operations on run files during sorting. Note that one buffer is required for each run being merged in the merge phase; this value impacts the total memory used during this phase. Refer to setMaxMerge(int) for more details.

      Values are supplied as number strings, supporting an optional size suffix. The common suffixes K, M, and G are supported, having the expected meaning; suffixes are case-insensitive. Omitting the suffix indicates the value is in bytes.

      If not specified or value is 0, this first defaults to EngineConfig.Sort.getIOBufferSize(). If that is also unspecified or value is 0, this defaults to DEFAULT_IO_BUFFER_SIZE.

      Parameters:
      sizeSpecifier - a string specifying a byte size. Sizes are specified as positive whole numbers with an optional case-insensitive multiplier suffix.
      Throws:
      com.pervasive.datarush.graphs.physical.InvalidPropertyValueException - if the size specifier cannot be parsed or specifies a negative size.
    • setIOBufferSize

      public void setIOBufferSize(long size)
      sets the buffer size, in bytes, used for I/O operations on run files.
      Parameters:
      size - Buffer size used for intermediate file operations
    • getIOBufferSize

      public long getIOBufferSize()
      Gets the buffer size, in bytes, used for I/O operations on run files.
      Returns:
      Buffer size used for intermediate file operations
    • getSortKeys

      public List<SortKey> getSortKeys()
      Returns the list of sort keys by which we are to sort.
      Returns:
      the list of sort keys by which we are to sort.
    • setSortKeys

      public void setSortKeys(List<SortKey> keys)
      Sets the list of sort keys by which we are to sort.
      Parameters:
      keys - the list of sort keys by which we are to sort.
    • setSortKeys

      public void setSortKeys(SortKey... keys)
      Sets the list of sort keys by which we are to sort.
      Parameters:
      keys - the list of sort keys by which we are to sort.
    • setSortKeys

      public void setSortKeys(String... keys)
      Sets the list of sort keys by which we are to sort.
      Parameters:
      keys - the list of sort keys by which we are to sort. Ascending order is assumed.
    • computeMetadata

      protected void computeMetadata(StreamingMetadataContext ctx)
      Description copied from class: DeferredCompositeOperator
      Implementations must adhere to all of the contracts specified by StreamingOperator.computeMetadata(com.pervasive.datarush.operators.StreamingMetadataContext). In addition, DeferredCompositeOperators must declare required metadata so as to satisfy requirements of the operators that are added during DeferredCompositeOperator.compose(com.pervasive.datarush.operators.DeferredCompositionContext).
      Specified by:
      computeMetadata in class DeferredCompositeOperator
      Parameters:
      ctx - the context
    • compose

      protected void compose(DeferredCompositionContext ctx)
      Description copied from class: DeferredCompositeOperator
      Compose the body of this operator. Implementations should do the following:
      1. Instantiate and configure sub-operators, adding them to the provided context via the method OperatorComposable.add(O)
      2. Create necessary connections via the method OperatorComposable.connect(P, P). This includes connections from the composite's input ports to sub-operators, connections between sub-operators, and connections from sub-operators output ports to the composite's output ports
      Specified by:
      compose in class DeferredCompositeOperator
      Parameters:
      ctx - the context