All Implemented Interfaces:
LogicalOperator, RecordSinkOperator, SinkOperator<RecordPort>

public class WriteAvro extends AbstractWriter
Writes data using Apache Avro format.

If appending to an existing target, data will be encoded using the schema stored with the target. If writing to a new file (including an append to a non-existent target) or overwriting an existing target, the supplied schema, if any will be used to encode the data. If none was provided, a schema will be generated using the input data type, as described later.

Input fields are mapped to the output schema by name. Any input fields not mapped in the schema are dropped. Any schema fields not in the input are implicitly null-valued. In this case, if the schema does not allow NULL for the field, an error will be raised.

Input fields must map to a supported Avro type. An error will be raised if no valid mapping exists. Furthermore, during execution, if an input field is null-valued, but the target field is not nullable - that is, is not a UNION including a NULL type - the graph will fail. Valid source field types for primitive Avro types is shown in the table below.

Target Avro TypeSource DataRush Types
BOOLEANBOOLEAN
BYTESBINARY
DOUBLEDOUBLE, FLOAT, LONG, INT
FIXEDBINARY
FLOATFLOAT, LONG, INT
LONGLONG, INT
INTINT
STRINGSTRING
Note: if mapping BINARY to FIXED, any record which has a binary value of the wrong number of bytes will case the graph to fail.

Mapping to complex Avro datatypes is handled as follows:

  • RECORD data is only allowed at the top-level as long as the individual fields can be mapped from the matching input fields. Nested records are not currently allowed except for Avro RECORD representations of DataRush DATE, TIME, and TIMESTAMP types as described later.
  • ENUM data can be assigned from DataRush STRING fields. If during execution the input field takes on a value which is not in the enumerated list of symbols, the graph will fail.
  • UNION data is allowed as long as one of the branches is a valid mapping from the matching input field.
  • ARRAY and MAP data in Avro is not currently supported.

As noted previously, a schema will be generated if necessary. The generated schema is an Avro RECORD consisting of fields in the the same order as the input record, having the same names. Field names are cleansed to be valid Avro field names using AvroSchemaUtils.cleanseName(String). If this cleansing results in a name collision, an error is raised. Each field in the generated schema will have a UNION type including NULL and the appropriate Avro schema type based on the input type as listed below:

  • BOOLEAN, DOUBLE, FLOAT, LONG, and INT are mapped to the Avro primitive type of the same name.
  • STRING is mapped differently based on the presence of a domain on the source field. If no domain is specified, it is mapped to the STRING primitive type. If a domain is specified, it is mapped to an ENUM having the same set of symbols as the domain.
  • BINARY is mapped to the BYTES primitive type.
  • NUMERIC is mapped to the DOUBLE primitive type; this may result in loss of precision.
  • CHAR is mapped to the STRING primitive type.
  • DATE is mapped to a nested RECORD having one field epochDays of type LONG. The value of this field is the same as DateValued#asEpochDays().
  • TIME is mapped to a nested RECORD having one field dayMillis of type INT. The value of this field is the same as TimeValued#asDayMillis().
  • TIMESTAMP is mapped to a nested RECORD having three fields: epochSecs of type LONG, subsecNanos of type INT, and offsetSecs of type INT. The value of these fields the same as those of the values with the same names in TimestampValued.
  • Constructor Details

    • WriteAvro

      public WriteAvro()
      Writes an empty target with default settings. The target must be set before execution or an error will be raised.
      See Also:
    • WriteAvro

      public WriteAvro(boolean includeDoneSignal)
      Writes an empty target with default settings, optionally providing a port for signaling completion of the write. The target must be set before execution or an error will be raised.
      Parameters:
      includeDoneSignal - indicates whether a done signal port should be created
      See Also:
    • WriteAvro

      public WriteAvro(String path, WriteMode mode)
      Writes to the specified path in the given mode.

      If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.

      Parameters:
      path - the path to which to write
      mode - how to handle existing files
    • WriteAvro

      public WriteAvro(Path path, WriteMode mode)
      Writes to the specified path in the given mode.

      If the writer is parallelized, this is interpreted as a directory in which each partition will write a fragment of the entire input stream. Otherwise, it is interpreted as the file to write.

      Parameters:
      path - the path to which to write
      mode - how to handle existing files
    • WriteAvro

      public WriteAvro(ByteSink target, WriteMode mode)
      Writes to the specified target sink in the given mode.

      The writer can only be parallelized if the sink is fragmentable. In this case, each partition will be written as an independent sink. Otherwise, the writer will run non-parallel.

      Parameters:
      target - the sink to which to write
      mode - how to handle an existing sink
  • Method Details

    • getSchema

      public org.apache.avro.Schema getSchema()
      Gets the currently specified schema for encoding the output.
      Returns:
      the schema to use when writing
    • setSchema

      public void setSchema(org.apache.avro.Schema schema)
      Sets the schema to use to encode the output. If appending to an existing target, this value is ignored and the schema stored with the target is used.

      A schema does not have to be provided, in which case one will automatically be generated from the input data.

      Parameters:
      schema - the Avro schema to use when writing
    • setSchema

      public void setSchema(String schemaFile) throws IOException
      Sets the schema to use to encode the output with the JSON-serialized schema in the specified file.
      Parameters:
      schemaFile - the name of the file containing the serialized Avro schema
      Throws:
      IOException
    • getCompression

      public Compression getCompression()
      Gets the currently specified compression method for writing data blocks.
      Returns:
      the compression method to use when writing
    • setCompression

      public void setCompression(Compression codec)
      Sets the compression method to use on data blocks. If appending to an existing target, this is ignored and the compression method of the target is used.
      Parameters:
      codec - the compression to use in the output file
    • computeFormat

      protected DataFormat computeFormat(CompositionContext ctx)
      Description copied from class: AbstractWriter
      Determines the data format for the target. The returned format is used during composition to construct a WriteSink operator. If an implementation supports schema discovery, it must be performed in this method.
      Specified by:
      computeFormat in class AbstractWriter
      Parameters:
      ctx - the composition context for the current invocation of AbstractWriter.compose(CompositionContext)
      Returns:
      the target format to use