public class LoadMatrix extends MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
setTable(String)
method. A mapping of input data source fields into target table columns
can be provided. Any unmapped input fields are ignored. Unmapped table columns are not loaded and must
be nullable. If field mapping is not provided, input fields are mapped to output columns by position.
The Actian Matrix database supports the concept of slices. A slice represents a unit of parallelism within the database. Each node within a database instance normally supports multiple slices. The loader operator communicates directly with slices to transfer data into the target database. The number of connections made into the database is determined by the parallelism level of the loader operator. The input data is not repartitioned to match the number of target slices. If the parallelism (partition count) of the loader operator is less than the number of database slices, some slices will be left idle during the load process. To repartition the data to match the number of slices in the target database, explicitly set the parallelism level of the loader operator to match the number of database slices.
A component of the data loader executes within the database. This in-database component supports
tracing for debugging. The tracing level can be set to values 0 to 9. By default, the trace level
is set to zero. To increase the amount of tracing produced in the database by the loader, set the
trace level to a positive, non-zero value. The setTraceLevel(int)
method is used to set
the trace level.
Network communication between this Dataflow operator and the Matrix database is accomplished using
TCP/IP sockets. The properties startPortRange
and endPortRange
can be
used to limit the range of TCP/IP ports utilized for this communication. The ports within this
range will need to be available for two way communication. Within an environment containing
a firewall, the ports will need to be "opened" within the firewall configuration. The default
port range is from 40000
to 41000
(exclusive).
A network connection for data transfer is created for each parallel data worker. To limit the number of network
connections used, limit the parallelism of the LoadMatrix
operator. A limit on connections can be
explicitly set on the operator using the maxConnections
operator property. Reference
the <@link MatrixOperatorBase.setMaxConnections(int)
> property setter.
Note that modifying the parallelism level of the LoadMatrix
operator either via the operator
setting or the setMaxConnections
method may incur the overhead of data redistribution to match
the changed parallelism level. To prevent redistribution, set the overall parallelism of the DataFlow job
to limit the number of data connections used into Matrix.
MatrixOperatorBase.Pair<A,B>, MatrixOperatorBase.Workers<E>
END_PORT_RANGE, host, logFrequency, logger, maxConnections, retryCount, START_PORT_RANGE, timeoutInterval
Constructor and Description |
---|
LoadMatrix() |
Modifier and Type | Method and Description |
---|---|
protected MatrixOperatorBase.Workers<com.pervasive.datarush.operators.io.paraccel.ODIInvoker> |
createWorkers(CompositionContext ctx,
RecordTokenType inputType,
com.pervasive.datarush.operators.io.paraccel.TableInfo tableInfo,
List<String> columns,
Map<String,String> columnMap,
int startPortRange,
int endPortRange,
int dataWorkerParallelism) |
String |
getDatabase()
Get the database name.
|
int |
getEndPortRange()
Get the end of the IP port range.
|
String |
getFinalizeTableSQL()
Retrieves the SQL statement to execute after processing all the records.
|
String |
getInitializeTableSQL()
Retrieves the SQL statement to execute before processing any records.
|
int |
getLogFrequency()
Get the log frequency property.
|
String |
getPassword()
Get the password.
|
int |
getPort()
Get the Matrix lead node port number.
|
Map<String,String> |
getRenameMapping()
Get the source to target field name mapping.
|
int |
getRetryCount()
Get the retry count property.
|
int |
getStartPortRange()
Get the start of the IP port range.
|
String |
getTable()
Get the target table name.
|
int |
getTimeoutInterval()
Get the communication timeout interval (in seconds).
|
int |
getTraceLevel()
Get the current trace level.
|
String |
getUser()
Get the user account name.
|
protected MatrixOperatorBase.Pair<Integer,Integer> |
portRange() |
protected Map<String,String> |
renameMapping() |
protected MatrixOperatorBase.Pair<com.pervasive.datarush.operators.io.paraccel.TableInfo,List<com.pervasive.datarush.operators.io.paraccel.SliceIdentifier>> |
schemaAndSliceInfo() |
void |
setDatabase(String databaseName)
Set the database name.
|
void |
setEndPortRange(int endPortRange)
Set the end of the IP port range to use for network connections to and from the
target Matrix database.
|
void |
setFinalizeTableSQL(String finalizeTableSQL)
Sets the SQL statement to execute after processing all the records.
|
void |
setInitializeTableSQL(String initializeTableSQL)
Sets the SQL statement to execute before processing any records.
|
void |
setLogFrequency(int logFrequency)
Set the frequency in units of rows of data that progress is reported.
|
void |
setPassword(String password)
Set the user's password.
|
void |
setPort(int port)
Set the Matrix leader node port number.
|
void |
setRenameMapping(Map<String,String> renameMapping)
Set the source to target field name mapping.
|
void |
setRetryCount(int retryCount)
Set the number of times communication requests to the MPP database system are retried before
being considered an error.
|
void |
setStartPortRange(int startPortRange)
Set the start of the IP port range to use for network connections to and from the
Matrix database.
|
void |
setTable(String tableName)
Set the name of the table to load.
|
void |
setTimeoutInterval(int timeoutInterval)
Set the timeout interval (in seconds) for communication requests the target MPP database system.
|
void |
setTraceLevel(int traceLevel)
Set the trace level used by the query UDF function invoked to load the data in the
target database.
|
void |
setUser(String userName)
Set the user name.
|
protected boolean |
skipCompose() |
protected void |
validateExtensions() |
compose, getHost, getInput, getMaxConnections, setHost, setMaxConnections, validatePortRange
disableParallelism, getInputPorts, getOutputPorts, newInput, newInput, newOutput, newRecordInput, newRecordInput, newRecordOutput, notifyError
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
disableParallelism, getInputPorts, getOutputPorts
public String getDatabase()
public void setDatabase(String databaseName)
databaseName
- database namepublic String getInitializeTableSQL()
public void setInitializeTableSQL(String initializeTableSQL)
CREATE TABLE
statement to create the table.
This statement is executed only once, regardless of #partitionCount
.
initializeTableSQL
- the SQL statement to execute before processing any recordspublic String getFinalizeTableSQL()
public void setFinalizeTableSQL(String finalizeTableSQL)
CREATE INDEX
statement.
This statement is executed only once, regardless of #partitionCount
.
finalizeTableSQL
- the SQL statement to execute after processing all the recordspublic String getUser()
public void setUser(String userName)
userName
- user account namepublic String getPassword()
public void setPassword(String password)
password
- public int getPort()
public void setPort(int port)
port
- port numberpublic int getStartPortRange()
public void setStartPortRange(int startPortRange)
startPortRange
- start point of the IP port rangepublic int getEndPortRange()
public void setEndPortRange(int endPortRange)
endPortRange
- end point of IP port rangepublic int getLogFrequency()
public void setLogFrequency(int logFrequency)
logFrequency
- the count of rows use as the frequency to log progresspublic int getRetryCount()
public void setRetryCount(int retryCount)
retryCount
- number of times to retry communication requestspublic int getTimeoutInterval()
public void setTimeoutInterval(int timeoutInterval)
timeoutInterval
- timeout interval in secondsprotected void validateExtensions()
validateExtensions
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
protected MatrixOperatorBase.Pair<com.pervasive.datarush.operators.io.paraccel.TableInfo,List<com.pervasive.datarush.operators.io.paraccel.SliceIdentifier>> schemaAndSliceInfo()
schemaAndSliceInfo
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
protected MatrixOperatorBase.Workers<com.pervasive.datarush.operators.io.paraccel.ODIInvoker> createWorkers(CompositionContext ctx, RecordTokenType inputType, com.pervasive.datarush.operators.io.paraccel.TableInfo tableInfo, List<String> columns, Map<String,String> columnMap, int startPortRange, int endPortRange, int dataWorkerParallelism)
createWorkers
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
protected Map<String,String> renameMapping()
renameMapping
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
public Map<String,String> getRenameMapping()
public void setRenameMapping(Map<String,String> renameMapping)
This is an optional property. If not provided, the input fields are mapped to the target database table by schema order.
renameMapping
- source to target field name mappingpublic String getTable()
public void setTable(String tableName)
tableName
- target table name.protected boolean skipCompose()
skipCompose
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
public int getTraceLevel()
public void setTraceLevel(int traceLevel)
The trace logs can be viewed in the database by querying the stl_udf_trace system table.
traceLevel
- trace level (from 0 - 9)protected MatrixOperatorBase.Pair<Integer,Integer> portRange()
portRange
in class MatrixOperatorBase<com.pervasive.datarush.operators.io.paraccel.ODIInvoker>
Copyright © 2021 Actian Corporation. All rights reserved.