- java.lang.Object
-
- com.pervasive.datarush.operators.AbstractLogicalOperator
-
- com.pervasive.datarush.operators.CompositeOperator
-
- com.pervasive.datarush.operators.io.AbstractReader
-
- All Implemented Interfaces:
LogicalOperator
,RecordSourceOperator
,SourceOperator<RecordPort>
- Direct Known Subclasses:
AbstractTextReader
,ReadAvro
,ReadMDF
,ReadORC
,ReadParquet
,ReadStagingDataset
public abstract class AbstractReader extends CompositeOperator implements RecordSourceOperator
A generic reader of byte data representing a stream of records. The reader encompasses the basic attributes any such reader should have:- a source of bytes, as identified by a
ByteSource
. This is most often a file or files, so convenience methods for specifying the source as a file are provided. - common parsing properties, such as record fields to omit in the output and on-error behavior.
- common I/O tunables, such as buffer sizes and splitting behavior
AbstractReader
wraps aReadSource
operator. Implementations provide an interface specific to an appropriate class of files - delimited text files, as an example - hiding the more complex model used to describe data parsing in general. The composition structure is the same for all readers with onlyDataFormat
used differing between implementations.
-
-
Field Summary
Fields Modifier and Type Field Description protected ParsingOptions
options
Container for options related to record parsingprotected RecordPort
output
The output port of the read operator
-
Constructor Summary
Constructors Modifier Constructor Description protected
AbstractReader()
Reads an empty source with default settings.protected
AbstractReader(Path path)
Reads the file specified by the path.protected
AbstractReader(ByteSource source)
Reads the specified data source using default options.protected
AbstractReader(String pattern)
Reads all paths matching the specified pattern using default options.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
compose(CompositionContext ctx)
Composes a reader for the source, using configured options and a derived format.protected abstract DataFormat
computeFormat(CompositionContext ctx)
Determines the data format for the source.ParseErrorAction
getExtraFieldAction()
Gets how fields found when parsing the record, but not declared in the schema are handled.ParseErrorAction
getFieldErrorAction()
Gets how fields which cannot be parsed are handled.int
getFieldLengthThreshold()
Gets the maximum length allowed for a field value before it is considered an error.boolean
getIncludeSourceInfo()
Indicates whether the parsed records should be tagged with additional fields indicating their source.ParseErrorAction
getMissingFieldAction()
Gets how fields declared in the schema, but not found when parsing the record are handled.RecordPort
getOutput()
Gets the record port providing the records read from the data source.ParsingOptions
getParseOptions()
Gets the parsing options used by the reader.boolean
getPessimisticSplitting()
Indicates whether pessimistic file splitting is used.int
getReadBuffer()
Gets the size of the I/O buffer, in bytes, to use for reads.boolean
getReadOnClient()
Indicates whether the reader should execute on the client.int
getRecordWarningThreshold()
Gets the maximum number of records allowed to have parse warnings.List<String>
getSelectedFields()
Gets the list of record fields to read.ByteSource
getSource()
Gets the source being read.SplitOptions
getSplitOptions()
Gets the configuration used in determining how to break the source into splits.boolean
getUseMetadata()
Indicates whether discovered metadata should be used to override the graph settings.void
setExtraFieldAction(ParseErrorAction action)
Sets how to handle fields found when parsing the record, but not declared in the schema.void
setFieldErrorAction(ParseErrorAction action)
Sets how to handle fields which cannot be parsed.void
setFieldLengthThreshold(int limit)
Configures the maximum length allowed for a field value before it is considered an error.void
setIncludeSourceInfo(boolean enabled)
Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source.void
setMissingFieldAction(ParseErrorAction action)
Sets how to handle fields declared in the schema, but not found when parsing the record.void
setParseErrorAction(ParseErrorAction action)
Sets how to handle all parsing errors.void
setParseOptions(ParsingOptions options)
Sets the parsing options used by the reader.void
setPessimisticSplitting(boolean enabled)
Configures whether pessimistic file splitting must be used.void
setReadBuffer(int size)
Sets the size of the I/O buffer, in bytes, to use for reads.void
setReadOnClient(boolean enabled)
Sets whether reads are performed by the client or in the cluster.void
setRecordWarningThreshold(int limit)
Configures the maximum number of records which can have parse warnings before failing.void
setSelectedFields(String... fields)
Sets the list of record fields to read.void
setSelectedFields(List<String> fields)
Sets the list of record fields to read.void
setSource(Path path)
Sets the data source to the specified path.void
setSource(ByteSource source)
Sets the data source to the specified source.void
setSource(String pattern)
Sets the data source to all paths matching the specified pattern.void
setSplitOptions(SplitOptions options)
Sets the configuration used in determining how to break the source into splits.void
setUseMetadata(boolean useMetadata)
Sets whether discovered metadata should be used to override the graph settings.-
Methods inherited from class com.pervasive.datarush.operators.AbstractLogicalOperator
disableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyError
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.pervasive.datarush.operators.LogicalOperator
disableParallelism, getInputPorts, getOutputPorts
-
-
-
-
Field Detail
-
output
protected final RecordPort output
The output port of the read operator
-
options
protected final ParsingOptions options
Container for options related to record parsing
-
-
Constructor Detail
-
AbstractReader
protected AbstractReader()
Reads an empty source with default settings. The source must be set before execution or an error will be raised.- See Also:
setSource(ByteSource)
-
AbstractReader
protected AbstractReader(String pattern)
Reads all paths matching the specified pattern using default options. Any matching path which is a directory is replaced with all files in the directory; this expansion is not recursive.- Parameters:
pattern
- a path-matching pattern- See Also:
FileClient#matchPaths(String)
-
AbstractReader
protected AbstractReader(Path path)
Reads the file specified by the path. If the path refers to a a directory, all files in the directory are read; this read is not recursive into sub-directories.- Parameters:
path
- the path to read
-
AbstractReader
protected AbstractReader(ByteSource source)
Reads the specified data source using default options.- Parameters:
source
- the data source to read
-
-
Method Detail
-
getOutput
public RecordPort getOutput()
Gets the record port providing the records read from the data source.- Specified by:
getOutput
in interfaceRecordSourceOperator
- Specified by:
getOutput
in interfaceSourceOperator<RecordPort>
- Returns:
- the flow of records produced by a sequential read of the data source
-
getSource
public ByteSource getSource()
Gets the source being read.- Returns:
- the data source to read
-
setSource
public void setSource(String pattern)
Sets the data source to all paths matching the specified pattern. Any matching path which is a directory is replaced with all files in the directory; this expansion is not recursive.- Parameters:
pattern
- a path-matching pattern- See Also:
FileClient#matchPaths(String)
-
setSource
public void setSource(Path path)
Sets the data source to the specified path. If the path refers to a a directory, all files in the directory are read; this read is not recursive into sub-directories.- Parameters:
path
- the path to read
-
setSource
public void setSource(ByteSource source)
Sets the data source to the specified source.- Parameters:
source
- the data source to read
-
getSplitOptions
public SplitOptions getSplitOptions()
Gets the configuration used in determining how to break the source into splits.- Returns:
- the options used when generating splits for the source
-
setSplitOptions
public void setSplitOptions(SplitOptions options)
Sets the configuration used in determining how to break the source into splits.This is an advanced option provided for performance tuning. Normally, it is not necessary to configure these settings.
- Parameters:
options
- the options to use when generating splits for the source
-
getPessimisticSplitting
public boolean getPessimisticSplitting()
Indicates whether pessimistic file splitting is used.- Returns:
- whether input sources can be broken into splits
-
setPessimisticSplitting
public void setPessimisticSplitting(boolean enabled)
Configures whether pessimistic file splitting must be used. By default, this is disabled.- Parameters:
enabled
- indicates whether to use pessimistic splitting- See Also:
ReadSource
-
getParseOptions
public ParsingOptions getParseOptions()
Gets the parsing options used by the reader.- Returns:
- the parse options
-
setParseOptions
public void setParseOptions(ParsingOptions options)
Sets the parsing options used by the reader. This sets all parse options at once.- Parameters:
options
- the parse options to use- See Also:
DataParser
-
getSelectedFields
public List<String> getSelectedFields()
Gets the list of record fields to read.- Returns:
- the fields which will be read.
-
setSelectedFields
public void setSelectedFields(String... fields)
Sets the list of record fields to read. If only a subset of fields are desired, it can be more efficient to parse only those fields. Only fields in this list will be in the output records. An empty list indicates all fields should be parsed; this is the default setting.- Parameters:
fields
- the record fields to read
-
setSelectedFields
public void setSelectedFields(List<String> fields)
Sets the list of record fields to read. If only a subset of fields are desired, it can be more efficient to parse only those fields. Only fields in this list will be in the output records. An empty list indicates all fields should be parsed; this is the default setting.- Parameters:
fields
- the record fields to read
-
getIncludeSourceInfo
public boolean getIncludeSourceInfo()
Indicates whether the parsed records should be tagged with additional fields indicating their source.- Returns:
true
if additional source information is provided with records- See Also:
setIncludeSourceInfo(boolean)
-
setIncludeSourceInfo
public void setIncludeSourceInfo(boolean enabled)
Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source. This information can also be used to reconstruct the original source ordering.When enabled, output will have three additional fields which, considered as a triple, uniquely identify and order the output.
- sourcePath, a string naming the original source file from
which the record originated. This is
NULL
if this cannot be determined. - splitOffset, a long providing the starting byte offset of the the parse split in the file.
- recordOffset, a long providing the starting offset for the record within the parse split. This value will be in units of bytes or characters as is appropriate for the parsed format. Character offsets are based on the first full character within the split.
setSelectedFields(List)
.- Parameters:
enabled
- indicates whether the output should be tagged with source information
- sourcePath, a string naming the original source file from
which the record originated. This is
-
getMissingFieldAction
public ParseErrorAction getMissingFieldAction()
Gets how fields declared in the schema, but not found when parsing the record are handled.- Returns:
- the action to take on missing fields
-
setMissingFieldAction
public void setMissingFieldAction(ParseErrorAction action)
Sets how to handle fields declared in the schema, but not found when parsing the record. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting isParseErrorAction.WARN
.- Parameters:
action
- the action to take on missing fields
-
getExtraFieldAction
public ParseErrorAction getExtraFieldAction()
Gets how fields found when parsing the record, but not declared in the schema are handled.- Returns:
- the action to take on extra fields
-
setExtraFieldAction
public void setExtraFieldAction(ParseErrorAction action)
Sets how to handle fields found when parsing the record, but not declared in the schema. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting isParseErrorAction.WARN
.- Parameters:
action
- the action to take on extra fields
-
getFieldErrorAction
public ParseErrorAction getFieldErrorAction()
Gets how fields which cannot be parsed are handled.- Returns:
- the action to take on field errors
-
setFieldErrorAction
public void setFieldErrorAction(ParseErrorAction action)
Sets how to handle fields which cannot be parsed. If the configured action does not discard the record, the missing fields will be null-valued in the output. By default, this setting isParseErrorAction.WARN
.- Parameters:
action
- the action to take on field errors
-
setParseErrorAction
public void setParseErrorAction(ParseErrorAction action)
Sets how to handle all parsing errors. This method is a convenience method for setting all individual classes of errors at once.- Parameters:
action
- the action to take on parse error- See Also:
setMissingFieldAction(ParseErrorAction)
,setExtraFieldAction(ParseErrorAction)
,setFieldErrorAction(ParseErrorAction)
-
getRecordWarningThreshold
public int getRecordWarningThreshold()
Gets the maximum number of records allowed to have parse warnings.- Returns:
- the limit on the number of records in error
-
setRecordWarningThreshold
public void setRecordWarningThreshold(int limit)
Configures the maximum number of records which can have parse warnings before failing. Only records which have an error whose configured action raises a warning count towards this limit. A record is only counted once towards this limit, regardless of the number of warnings.By default, this limit is
100
. Setting the limit to0
means there is no restriction on the number of warnings.This limit is applied per-split. Therefore, it is possible that a file in total may be allowed more warnings than the limit, depending on how it is split.
- Parameters:
limit
- the number of records with warnings allowed
-
getFieldLengthThreshold
public int getFieldLengthThreshold()
Gets the maximum length allowed for a field value before it is considered an error.- Returns:
- the maximum field value length allowed
-
setFieldLengthThreshold
public void setFieldLengthThreshold(int limit)
Configures the maximum length allowed for a field value before it is considered an error. Long fields can be a sign of a misconfigured reader. When this limit is reached, the parser will fail the current field and attempt to restart on the next field.By default, this limit is 1M. Setting the limit to
0
means there is no restriction on the number of warnings.- Parameters:
limit
- the maximum field value length allowed
-
getReadBuffer
public int getReadBuffer()
Gets the size of the I/O buffer, in bytes, to use for reads.- Returns:
- the size of the read buffer
-
setReadBuffer
public void setReadBuffer(int size)
Sets the size of the I/O buffer, in bytes, to use for reads. The default size is 64K.- Parameters:
size
- the size of the read buffer
-
getReadOnClient
public boolean getReadOnClient()
Indicates whether the reader should execute on the client.- Returns:
true
if the reader will execute on the client,false
otherwise.
-
setReadOnClient
public void setReadOnClient(boolean enabled)
Sets whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster, if executed in a distributed context.This normally only needs to be set if the source is not accessible from the cluster; for example, when the file resides only on the client. When executing in in pseudo-distributed mode, this setting has no effect.
If the intent is to suppress parallel reads (to guarantee record ordering matches the source, for instance), use
AbstractLogicalOperator.disableParallelism()
instead.- Parameters:
enabled
- whether to read on client
-
getUseMetadata
public boolean getUseMetadata()
Indicates whether discovered metadata should be used to override the graph settings.- Returns:
true
if discovered metadata should be used,false
otherwise.
-
setUseMetadata
public void setUseMetadata(boolean useMetadata)
Sets whether discovered metadata should be used to override the graph settings. By default this is set to false, since not all readers may actually support metadata.- Parameters:
useMetadata
- whether discovered metadata should be used
-
computeFormat
protected abstract DataFormat computeFormat(CompositionContext ctx)
Determines the data format for the source. The returned format is used during composition to construct aReadSource
operator. If an implementation supports schema discovery, it must be performed in this method.- Parameters:
ctx
- the composition context for the current invocation ofcompose(CompositionContext)
- Returns:
- the source format to use
-
compose
protected final void compose(CompositionContext ctx)
Composes a reader for the source, using configured options and a derived format.- Specified by:
compose
in classCompositeOperator
- Parameters:
ctx
- the context
-
-