Class AbstractReader

  • All Implemented Interfaces:
    LogicalOperator, RecordSourceOperator, SourceOperator<RecordPort>
    Direct Known Subclasses:
    AbstractTextReader, ReadAvro, ReadMDF, ReadORC, ReadParquet, ReadStagingDataset

    public abstract class AbstractReader
    extends CompositeOperator
    implements RecordSourceOperator
    A generic reader of byte data representing a stream of records. The reader encompasses the basic attributes any such reader should have:
    • a source of bytes, as identified by a ByteSource. This is most often a file or files, so convenience methods for specifying the source as a file are provided.
    • common parsing properties, such as record fields to omit in the output and on-error behavior.
    • common I/O tunables, such as buffer sizes and splitting behavior
    An AbstractReader wraps a ReadSource operator. Implementations provide an interface specific to an appropriate class of files - delimited text files, as an example - hiding the more complex model used to describe data parsing in general. The composition structure is the same for all readers with only DataFormat used differing between implementations.
    • Field Detail

      • output

        protected final RecordPort output
        The output port of the read operator
      • options

        protected final ParsingOptions options
        Container for options related to record parsing
    • Constructor Detail

      • AbstractReader

        protected AbstractReader()
        Reads an empty source with default settings. The source must be set before execution or an error will be raised.
        See Also:
        setSource(ByteSource)
      • AbstractReader

        protected AbstractReader​(String pattern)
        Reads all paths matching the specified pattern using default options. Any matching path which is a directory is replaced with all files in the directory; this expansion is not recursive.
        Parameters:
        pattern - a path-matching pattern
        See Also:
        FileClient#matchPaths(String)
      • AbstractReader

        protected AbstractReader​(Path path)
        Reads the file specified by the path. If the path refers to a a directory, all files in the directory are read; this read is not recursive into sub-directories.
        Parameters:
        path - the path to read
      • AbstractReader

        protected AbstractReader​(ByteSource source)
        Reads the specified data source using default options.
        Parameters:
        source - the data source to read
    • Method Detail

      • getSource

        public ByteSource getSource()
        Gets the source being read.
        Returns:
        the data source to read
      • setSource

        public void setSource​(String pattern)
        Sets the data source to all paths matching the specified pattern. Any matching path which is a directory is replaced with all files in the directory; this expansion is not recursive.
        Parameters:
        pattern - a path-matching pattern
        See Also:
        FileClient#matchPaths(String)
      • setSource

        public void setSource​(Path path)
        Sets the data source to the specified path. If the path refers to a a directory, all files in the directory are read; this read is not recursive into sub-directories.
        Parameters:
        path - the path to read
      • setSource

        public void setSource​(ByteSource source)
        Sets the data source to the specified source.
        Parameters:
        source - the data source to read
      • getSplitOptions

        public SplitOptions getSplitOptions()
        Gets the configuration used in determining how to break the source into splits.
        Returns:
        the options used when generating splits for the source
      • setSplitOptions

        public void setSplitOptions​(SplitOptions options)
        Sets the configuration used in determining how to break the source into splits.

        This is an advanced option provided for performance tuning. Normally, it is not necessary to configure these settings.

        Parameters:
        options - the options to use when generating splits for the source
      • getPessimisticSplitting

        public boolean getPessimisticSplitting()
        Indicates whether pessimistic file splitting is used.
        Returns:
        whether input sources can be broken into splits
      • setPessimisticSplitting

        public void setPessimisticSplitting​(boolean enabled)
        Configures whether pessimistic file splitting must be used. By default, this is disabled.
        Parameters:
        enabled - indicates whether to use pessimistic splitting
        See Also:
        ReadSource
      • getParseOptions

        public ParsingOptions getParseOptions()
        Gets the parsing options used by the reader.
        Returns:
        the parse options
      • setParseOptions

        public void setParseOptions​(ParsingOptions options)
        Sets the parsing options used by the reader. This sets all parse options at once.
        Parameters:
        options - the parse options to use
        See Also:
        DataParser
      • getSelectedFields

        public List<String> getSelectedFields()
        Gets the list of record fields to read.
        Returns:
        the fields which will be read.
      • setSelectedFields

        public void setSelectedFields​(String... fields)
        Sets the list of record fields to read. If only a subset of fields are desired, it can be more efficient to parse only those fields. Only fields in this list will be in the output records. An empty list indicates all fields should be parsed; this is the default setting.
        Parameters:
        fields - the record fields to read
      • setSelectedFields

        public void setSelectedFields​(List<String> fields)
        Sets the list of record fields to read. If only a subset of fields are desired, it can be more efficient to parse only those fields. Only fields in this list will be in the output records. An empty list indicates all fields should be parsed; this is the default setting.
        Parameters:
        fields - the record fields to read
      • getIncludeSourceInfo

        public boolean getIncludeSourceInfo()
        Indicates whether the parsed records should be tagged with additional fields indicating their source.
        Returns:
        true if additional source information is provided with records
        See Also:
        setIncludeSourceInfo(boolean)
      • setIncludeSourceInfo

        public void setIncludeSourceInfo​(boolean enabled)
        Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source. This information can also be used to reconstruct the original source ordering.

        When enabled, output will have three additional fields which, considered as a triple, uniquely identify and order the output.

        • sourcePath, a string naming the original source file from which the record originated. This is NULL if this cannot be determined.
        • splitOffset, a long providing the starting byte offset of the the parse split in the file.
        • recordOffset, a long providing the starting offset for the record within the parse split. This value will be in units of bytes or characters as is appropriate for the parsed format. Character offsets are based on the first full character within the split.
        The additional fields will have the names indicated above and will be prepended to the normal output of the reader. If these names collide with existing output fields, they will be renamed using the standard collision handling process. These presence of these fields is not affected by any field filtering specified using setSelectedFields(List).
        Parameters:
        enabled - indicates whether the output should be tagged with source information
      • getMissingFieldAction

        public ParseErrorAction getMissingFieldAction()
        Gets how fields declared in the schema, but not found when parsing the record are handled.
        Returns:
        the action to take on missing fields
      • setMissingFieldAction

        public void setMissingFieldAction​(ParseErrorAction action)
        Sets how to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting is ParseErrorAction.WARN.
        Parameters:
        action - the action to take on missing fields
      • getExtraFieldAction

        public ParseErrorAction getExtraFieldAction()
        Gets how fields found when parsing the record, but not declared in the schema are handled.
        Returns:
        the action to take on extra fields
      • setExtraFieldAction

        public void setExtraFieldAction​(ParseErrorAction action)
        Sets how to handle fields found when parsing the record, but not declared in the schema. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting is ParseErrorAction.WARN.
        Parameters:
        action - the action to take on extra fields
      • getFieldErrorAction

        public ParseErrorAction getFieldErrorAction()
        Gets how fields which cannot be parsed are handled.
        Returns:
        the action to take on field errors
      • setFieldErrorAction

        public void setFieldErrorAction​(ParseErrorAction action)
        Sets how to handle fields which cannot be parsed. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting is ParseErrorAction.WARN.
        Parameters:
        action - the action to take on field errors
      • getRecordWarningThreshold

        public int getRecordWarningThreshold()
        Gets the maximum number of records allowed to have parse warnings.
        Returns:
        the limit on the number of records in error
      • setRecordWarningThreshold

        public void setRecordWarningThreshold​(int limit)
        Configures the maximum number of records which can have parse warnings before failing. Only records which have an error whose configured action raises a warning count towards this limit. A record is only counted once towards this limit, regardless of the number of warnings.

        By default, this limit is 100. Setting the limit to 0 means there is no restriction on the number of warnings.

        This limit is applied per-split. Therefore, it is possible that a file in total may be allowed more warnings than the limit, depending on how it is split.

        Parameters:
        limit - the number of records with warnings allowed
      • getFieldLengthThreshold

        public int getFieldLengthThreshold()
        Gets the maximum length allowed for a field value before it is considered an error.
        Returns:
        the maximum field value length allowed
      • setFieldLengthThreshold

        public void setFieldLengthThreshold​(int limit)
        Configures the maximum length allowed for a field value before it is considered an error. Long fields can be a sign of a misconfigured reader. When this limit is reached, the parser will fail the current field and attempt to restart on the next field.

        By default, this limit is 1M. Setting the limit to 0 means there is no restriction on the number of warnings.

        Parameters:
        limit - the maximum field value length allowed
      • getReadBuffer

        public int getReadBuffer()
        Gets the size of the I/O buffer, in bytes, to use for reads.
        Returns:
        the size of the read buffer
      • setReadBuffer

        public void setReadBuffer​(int size)
        Sets the size of the I/O buffer, in bytes, to use for reads. The default size is 64K.
        Parameters:
        size - the size of the read buffer
      • getReadOnClient

        public boolean getReadOnClient()
        Indicates whether the reader should execute on the client.
        Returns:
        true if the reader will execute on the client, false otherwise.
      • setReadOnClient

        public void setReadOnClient​(boolean enabled)
        Sets whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster, if executed in a distributed context.

        This normally only needs to be set if the source is not accessible from the cluster; for example, when the file resides only on the client. When executing in in pseudo-distributed mode, this setting has no effect.

        If the intent is to suppress parallel reads (to guarantee record ordering matches the source, for instance), use AbstractLogicalOperator.disableParallelism() instead.

        Parameters:
        enabled - whether to read on client
      • getUseMetadata

        public boolean getUseMetadata()
        Indicates whether discovered metadata should be used to override the graph settings.
        Returns:
        true if discovered metadata should be used, false otherwise.
      • setUseMetadata

        public void setUseMetadata​(boolean useMetadata)
        Sets whether discovered metadata should be used to override the graph settings. By default this is set to false, since not all readers may actually support metadata.
        Parameters:
        useMetadata - whether discovered metadata should be used
      • computeFormat

        protected abstract DataFormat computeFormat​(CompositionContext ctx)
        Determines the data format for the source. The returned format is used during composition to construct a ReadSource operator. If an implementation supports schema discovery, it must be performed in this method.
        Parameters:
        ctx - the composition context for the current invocation of compose(CompositionContext)
        Returns:
        the source format to use
      • compose

        protected final void compose​(CompositionContext ctx)
        Composes a reader for the source, using configured options and a derived format.
        Specified by:
        compose in class CompositeOperator
        Parameters:
        ctx - the context