- java.lang.Object
-
- com.pervasive.datarush.operators.AbstractLogicalOperator
-
- com.pervasive.datarush.operators.CompositeOperator
-
- com.pervasive.datarush.operators.io.ReadSource
-
- All Implemented Interfaces:
LogicalOperator
,RecordSourceOperator
,SourceOperator<RecordPort>
public final class ReadSource extends CompositeOperator implements RecordSourceOperator
Reads a data source as a stream of records. The data source provides a sequence of bytes which are in some format which can be parsed into records, assumed to be identical in logical structure. The mapping between physical and logical structure is encapsulated in a format descriptor which must be provided.This operator is low-level, providing a generalized model for reading files in a distributed fashion. Typically,
ReadSource
is not directly used in a graph, instead being indirectly used though a composite operator such as one derived fromAbstractReader
providing a more appropriate interface to the end user.Parallelized reads are implemented by breaking input files into independently parsed pieces, a process called splitting. Splits are then distributed to available partitions and parsed. When run on a distributed cluster, the reader makes an attempt to assign splits to machines where the I/O will be local, but non-local assignment may occur in order to provide work for all partitions. Distributed execution also makes an assumption that the specified data source is accessible from any machine. If this is not the case, the read must be made non-parallel by using
AbstractLogicalOperator.disableParallelism()
.Not all formats support splitting; this generally requires a way of unambiguously identifying record boundaries. Formats will indicate whether they can support splitting. If not, each input file will be treated as a single split. Even with a non-splittable format, this means reading multiple files can be parallelized. Some formats can partially support splitting, but in a "optimistic" fashion; under most circumstances splits can be handled, but in some edge cases splitting leads to parse failures. For these cases, the reader supports a "pessimistic" mode which can be used to assume a format is non-splittable, regardless of what it reports.
The reader makes a best-effort attempt to validate the data source before execution, but cannot always guarantee correctness, depending on the nature of the data source. This is done to try to prevent misconfigured graphs from executing where the reader may not execute until a late phase when a failure may result in the loss of a significant amount of work being lost.
-
-
Constructor Summary
Constructors Constructor Description ReadSource()
Reads an empty source with default settings.ReadSource(ByteSource source, DataFormat format)
Reads the specified source using the given format.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
compose(CompositionContext ctx)
Compose the body of this operator.DataFormat
getFormat()
Gets the data format for the configured source.boolean
getIncludeSourceInfo()
Indicates whether the parsed records should be tagged with additional fields indicating their source.RecordPort
getOutput()
Gets the record port providing the records read from the data source.ParsingOptions
getParseOptions()
Gets the parsing options used by the reader.boolean
getPessimisticSplitting()
Indicates whether pessimistic file splitting is used.boolean
getReadOnClient()
Indicates whether the reader should execute on the client.ByteSource
getSource()
Gets the data source for the reader.SplitOptions
getSplitOptions()
Gets the parsing options used by the reader.void
setFormat(DataFormat format)
Sets the data format for the configured source.void
setIncludeSourceInfo(boolean enabled)
Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source.void
setParseOptions(ParsingOptions options)
Sets the parsing options used by the reader.void
setPessimisticSplitting(boolean enabled)
Configures whether pessimistic file splitting must be used.void
setReadOnClient(boolean enabled)
Sets whether reads are performed by the client or in the cluster.void
setSource(ByteSource source)
Sets the data source for the reader.void
setSplitOptions(SplitOptions options)
Sets the split options used by the reader.-
Methods inherited from class com.pervasive.datarush.operators.AbstractLogicalOperator
disableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyError
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.pervasive.datarush.operators.LogicalOperator
disableParallelism, getInputPorts, getOutputPorts
-
-
-
-
Constructor Detail
-
ReadSource
public ReadSource()
Reads an empty source with default settings. Both the source and format must be set before execution or an error will be raised.- See Also:
setSource(ByteSource)
,setFormat(DataFormat)
-
ReadSource
public ReadSource(ByteSource source, DataFormat format)
Reads the specified source using the given format. Default options are used.- Parameters:
source
- the source to readformat
- the source data format
-
-
Method Detail
-
getOutput
public RecordPort getOutput()
Gets the record port providing the records read from the data source.- Specified by:
getOutput
in interfaceRecordSourceOperator
- Specified by:
getOutput
in interfaceSourceOperator<RecordPort>
- Returns:
- the flow of records produced by a sequential read of the data source
-
getSource
public ByteSource getSource()
Gets the data source for the reader.- Returns:
- the read source
-
setSource
public void setSource(ByteSource source)
Sets the data source for the reader.- Parameters:
source
- the source to read
-
getFormat
public DataFormat getFormat()
Gets the data format for the configured source.- Returns:
- the data format
-
setFormat
public void setFormat(DataFormat format)
Sets the data format for the configured source.- Parameters:
format
- the source data format
-
getIncludeSourceInfo
public boolean getIncludeSourceInfo()
Indicates whether the parsed records should be tagged with additional fields indicating their source.- Returns:
true
if additional source information is provided with records- See Also:
setIncludeSourceInfo(boolean)
-
setIncludeSourceInfo
public void setIncludeSourceInfo(boolean enabled)
Controls whether parsed records will be tagged with additional fields indicating how to locate them in their original source. This information can also be used to reconstruct the original source ordering.When enabled, output will have three additional fields which, considered as a triple, uniquely identify and order the output.
- sourcePath, a string naming the original source file from
which the record originated. This is
NULL
if this cannot be determined. - splitOffset, a long providing the starting byte offset of the the parse split in the file.
- recordOffset, a long providing the starting offset for the record within the parse split. This value will be in units of bytes or characters as is appropriate for the parsed format. Character offsets are based on the first full character within the split.
#setSelectedFields(List)
.- Parameters:
enabled
- indicates whether the output should be tagged with source information
- sourcePath, a string naming the original source file from
which the record originated. This is
-
getParseOptions
public ParsingOptions getParseOptions()
Gets the parsing options used by the reader.- Returns:
- the parse options
-
setParseOptions
public void setParseOptions(ParsingOptions options)
Sets the parsing options used by the reader.- Parameters:
options
- the parse options to use- See Also:
DataParser
-
getSplitOptions
public SplitOptions getSplitOptions()
Gets the parsing options used by the reader.- Returns:
- the parse options
-
setSplitOptions
public void setSplitOptions(SplitOptions options)
Sets the split options used by the reader.- Parameters:
options
- the split options to use- See Also:
DataParser
-
getPessimisticSplitting
public boolean getPessimisticSplitting()
Indicates whether pessimistic file splitting is used.- Returns:
- whether input sources can be broken into splits
-
setPessimisticSplitting
public void setPessimisticSplitting(boolean enabled)
Configures whether pessimistic file splitting must be used. By default, this is disabled and splitting is determined by the format.- Parameters:
enabled
- indicates whether to use pessimistic splitting
-
getReadOnClient
public boolean getReadOnClient()
Indicates whether the reader should execute on the client.- Returns:
true
if the reader will execute on the client,false
otherwise.
-
setReadOnClient
public void setReadOnClient(boolean enabled)
Sets whether reads are performed by the client or in the cluster. By default, reads are performed in the cluster, if executed in a distributed context.This normally only needs to be set if the source is not accessible from the cluster; for example, when the file resides only on the client.
- Parameters:
enabled
- whether to perform reads on the client
-
compose
protected void compose(CompositionContext ctx)
Description copied from class:CompositeOperator
Compose the body of this operator. Implementations should do the following:- Perform any validation of configuration, input types, etc
- Instantiate and configure sub-operators, adding them to the provided context via
the method
OperatorComposable.add(O)
- Create necessary connections via the method
OperatorComposable.connect(P, P)
. This includes connections from the composite's input ports to sub-operators, connections between sub-operators, and connections from sub-operators output ports to the composite's output ports
- Specified by:
compose
in classCompositeOperator
- Parameters:
ctx
- the context
-
-