public class MatrixSink extends MatrixOperatorBase<Void>
Load data from the input data stream into Actian Matrix. Graphs containing this operator cannot
be run standalone. They must be serialized to JSON as a LogicalGraphWithConfig
and deployed to a Matrix database. Execution is triggered by a Matrix SQL UDF invocation that
provides all required configuration settings and posts the graph to a cluster for execution.
The target schema is defined as a reference to an existing database table through a named parameter to the Matrix SQL UDF invocation. The names and types of the input port fields must match those of the target schema. All input port fields must be part of the target schema. Target schema columns that have no corresponding input port field will be set to the default ('null') value for their database type.
The Actian Matrix database supports the concept of slices. A slice represents a unit of parallelism within the database. Each node within a database instance normally supports multiple slices. The loader operator communicates directly with slices to transfer data into the target database. The number of connections made into the database is determined by the parallelism level of the loader operator. The input data is not repartitioned to match the number of target slices. If the parallelism (partition count) of the loader operator is less than the number of database slices, some slices will be left idle during the load process. To repartition the data to match the number of slices in the target database, explicitly set the parallelism level of the loader operator to match the number of database slices.
Network communication between this Dataflow operator and the Matrix database is accomplished using
TCP/IP sockets. The properties startPortRange
and endPortRange
can be
used to limit the range of TCP/IP ports utilized for this communication. The ports within this
range will need to be available for two way communication. Within an environment containing
a firewall, the ports will need to be "opened" within the firewall configuration. The default
port range is from 40000
to 41000
(exclusive).
MatrixOperatorBase.Pair<A,B>, MatrixOperatorBase.Workers<E>
END_PORT_RANGE, host, logFrequency, logger, maxConnections, retryCount, START_PORT_RANGE, timeoutInterval
Constructor and Description |
---|
MatrixSink() |
Modifier and Type | Method and Description |
---|---|
protected MatrixOperatorBase.Workers<Void> |
createWorkers(CompositionContext ctx,
RecordTokenType inputType,
com.pervasive.datarush.operators.io.paraccel.TableInfo tableInfo,
List<String> columns,
Map<String,String> columnMap,
int startPortRange,
int endPortRange,
int dataWorkerParallelism) |
String |
getEndPort()
End of the IP port range to be used for the slice registration server socket.
|
String |
getMatrixPort()
The server port on the Matrix instance listening for messages from this operator.
|
String |
getSliceInfo()
A comma separated list of Matrix slice specs in the form
|
String |
getStartPort()
Start of the IP port range to be used for the slice registration server socket.
|
String |
getTableSchema()
A comma separated list of column specs in the form
|
protected MatrixOperatorBase.Pair<Integer,Integer> |
portRange() |
protected Map<String,String> |
renameMapping() |
protected MatrixOperatorBase.Pair<com.pervasive.datarush.operators.io.paraccel.TableInfo,List<com.pervasive.datarush.operators.io.paraccel.SliceIdentifier>> |
schemaAndSliceInfo() |
void |
setEndPort(String endPort)
End of the IP port range to be used for the slice registration server socket.
|
void |
setMatrixPort(String matrixPort)
Sets the server port on the Matrix instance listening for messages from this operator.
|
void |
setSliceInfo(String sliceInfo)
Takes a comma separated list of Matrix slice specs in the form
|
void |
setStartPort(String startPort)
Sets the start of the IP port range to be used for the slice registration server socket.
|
void |
setTableSchema(String tableSchema)
Takes a comma separated list of column specs in the form
|
protected boolean |
skipCompose() |
protected void |
validateExtensions() |
compose, getHost, getInput, getMaxConnections, setHost, setMaxConnections, validatePortRange
disableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyError
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
disableParallelism, getInputPorts, getOutputPorts
public String getMatrixPort()
public void setMatrixPort(String matrixPort)
public String getTableSchema()
public void setTableSchema(String tableSchema)
public String getSliceInfo()
public void setSliceInfo(String sliceInfo)
public String getStartPort()
public void setStartPort(String startPort)
public String getEndPort()
public void setEndPort(String endPort)
protected void validateExtensions()
validateExtensions
in class MatrixOperatorBase<Void>
protected MatrixOperatorBase.Pair<com.pervasive.datarush.operators.io.paraccel.TableInfo,List<com.pervasive.datarush.operators.io.paraccel.SliceIdentifier>> schemaAndSliceInfo()
schemaAndSliceInfo
in class MatrixOperatorBase<Void>
protected MatrixOperatorBase.Workers<Void> createWorkers(CompositionContext ctx, RecordTokenType inputType, com.pervasive.datarush.operators.io.paraccel.TableInfo tableInfo, List<String> columns, Map<String,String> columnMap, int startPortRange, int endPortRange, int dataWorkerParallelism)
createWorkers
in class MatrixOperatorBase<Void>
protected Map<String,String> renameMapping()
renameMapping
in class MatrixOperatorBase<Void>
protected boolean skipCompose()
skipCompose
in class MatrixOperatorBase<Void>
protected MatrixOperatorBase.Pair<Integer,Integer> portRange()
portRange
in class MatrixOperatorBase<Void>
Copyright © 2021 Actian Corporation. All rights reserved.