Class ReadSource

  • All Implemented Interfaces:
    LogicalOperator, RecordSourceOperator, SourceOperator<RecordPort>

    public final class ReadSource
    extends CompositeOperator
    implements RecordSourceOperator
    Reads a data source as a stream of records. The data source provides a sequence of bytes which are in some format which can be parsed into records, assumed to be identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor which must be provided.

    This operator is low-level, providing a generalized model for reading files in a distributed fashion. Typically, ReadSource is not directly used in a graph, instead being indirectly used though a composite operator such as one derived from AbstractReader providing a more appropriate interface to the end user.

    Parallelized reads are implemented by breaking input files into independently parsed pieces, a process called splitting. Splits are then distributed to available partitions and parsed. When run on a distributed cluster, the reader makes an attempt to assign splits to machines where the I/O will be local, but non-local assignment may occur in order to provide work for all partitions. Distributed execution also makes an assumption that the specified data source is accessible from any machine. If this is not the case, the read must be made non-parallel by using AbstractLogicalOperator.disableParallelism().

    Not all formats support splitting; this generally requires a way of unambiguously identifying record boundaries. Formats will indicate whether they can support splitting. If not, each input file will be treated as a single split. Even with a non-splittable format, this means reading multiple files can be parallelized. Some formats can partially support splitting, but in a "optimistic" fashion; under most circumstances splits can be handled, but in some edge cases splitting leads to parse failures. For these cases, the reader supports a "pessimistic" mode which can be used to assume a format is non-splittable, regardless of what it reports.

    The reader makes a best-effort attempt to validate the data source before execution, but cannot always guarantee correctness, depending on the nature of the data source. This is done to try to prevent misconfigured graphs from executing where the reader may not execute until a late phase when a failure may result in the loss of a significant amount of work being lost.

    • Constructor Detail

      • ReadSource

        public ReadSource()
        Reads an empty source with default settings. Both the source and format must be set before execution or an error will be raised.
        See Also:
        setSource(ByteSource), setFormat(DataFormat)
      • ReadSource

        public ReadSource​(ByteSource source,
                          DataFormat format)
        Reads the specified source using the given format. Default options are used.
        Parameters:
        source - the source to read
        format - the source data format
    • Method Detail

      • getSource

        public ByteSource getSource()
        Gets the data source for the reader.
        Returns:
        the read source
      • setSource

        public void setSource​(ByteSource source)
        Sets the data source for the reader.
        Parameters:
        source - the source to read
      • getFormat

        public DataFormat getFormat()
        Gets the data format for the configured source.
        Returns:
        the data format
      • setFormat

        public void setFormat​(DataFormat format)
        Sets the data format for the configured source.
        Parameters:
        format - the source data format
      • getIncludeSourceInfo

        public boolean getIncludeSourceInfo()
        Indicates whether the parsed records should be tagged with additional fields indicating their source.
        Returns:
        true if additional source information is provided with records
        See Also:
        setIncludeSourceInfo(boolean)
      • setIncludeSourceInfo

        public void setIncludeSourceInfo​(boolean enabled)
        Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source. This information can also be used to reconstruct the original source ordering.

        When enabled, output will have three additional fields which, considered as a triple, uniquely identify and order the output.

        • sourcePath, a string naming the original source file from which the record originated. This is NULL if this cannot be determined.
        • splitOffset, a long providing the starting byte offset of the the parse split in the file.
        • recordOffset, a long providing the starting offset for the record within the parse split. This value will be in units of bytes or characters as is appropriate for the parsed format. Character offsets are based on the first full character within the split.
        The additional fields will have the names indicated above and will be prepended to the normal output of the reader. If these names collide with existing output fields, they will be renamed using the standard collision handling process. These presence of these fields is not affected by any field filtering specified using #setSelectedFields(List).
        Parameters:
        enabled - indicates whether the output should be tagged with source information
      • getParseOptions

        public ParsingOptions getParseOptions()
        Gets the parsing options used by the reader.
        Returns:
        the parse options
      • setParseOptions

        public void setParseOptions​(ParsingOptions options)
        Sets the parsing options used by the reader.
        Parameters:
        options - the parse options to use
        See Also:
        DataParser
      • getSplitOptions

        public SplitOptions getSplitOptions()
        Gets the parsing options used by the reader.
        Returns:
        the parse options
      • setSplitOptions

        public void setSplitOptions​(SplitOptions options)
        Sets the split options used by the reader.
        Parameters:
        options - the split options to use
        See Also:
        DataParser
      • getPessimisticSplitting

        public boolean getPessimisticSplitting()
        Indicates whether pessimistic file splitting is used.
        Returns:
        whether input sources can be broken into splits
      • setPessimisticSplitting

        public void setPessimisticSplitting​(boolean enabled)
        Configures whether pessimistic file splitting must be used. By default, this is disabled and splitting is determined by the format.
        Parameters:
        enabled - indicates whether to use pessimistic splitting
      • getReadOnClient

        public boolean getReadOnClient()
        Indicates whether the reader should execute on the client.
        Returns:
        true if the reader will execute on the client, false otherwise.
      • setReadOnClient

        public void setReadOnClient​(boolean enabled)
        Sets whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster, if executed in a distributed context.

        This normally only needs to be set if the source is not accessible from the cluster; for example, when the file resides only on the client.

        Parameters:
        enabled - whether to perform reads on the client
      • compose

        protected void compose​(CompositionContext ctx)
        Description copied from class: CompositeOperator
        Compose the body of this operator. Implementations should do the following:
        1. Perform any validation of configuration, input types, etc
        2. Instantiate and configure sub-operators, adding them to the provided context via the method OperatorComposable.add(O)
        3. Create necessary connections via the method OperatorComposable.connect(P, P). This includes connections from the composite's input ports to sub-operators, connections between sub-operators, and connections from sub-operators output ports to the composite's output ports
        Specified by:
        compose in class CompositeOperator
        Parameters:
        ctx - the context