output_format_avro_rows_in_file

This commit is contained in:
Ilya Golshtein 2021-10-13 11:19:37 +03:00
parent a651c1bb67
commit d90302aa3b
7 changed files with 129 additions and 15 deletions

View File

@ -572,8 +572,6 @@ class IColumn;
M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \
M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
\
M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\
M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \
@ -590,6 +588,8 @@ class IColumn;
M(UInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
M(UInt64, output_format_avro_rows_in_file, 1000000, "Max rows in a file (if permitted by storage)", 0) \
M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \
M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \
M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \

View File

@ -53,6 +53,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString();
format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;

View File

@ -64,6 +64,7 @@ struct FormatSettings
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
String string_column_pattern;
UInt64 output_rows_in_file = 1000000;
} avro;
struct CSV

View File

@ -43,6 +43,8 @@
#include <re2/re2.h>
#include <base/logger_useful.h>
namespace DB
{
namespace ErrorCodes
@ -93,7 +95,7 @@ public:
virtual void backup(size_t len) override { out.position() -= len; }
virtual uint64_t byteCount() const override { return out.count(); }
virtual void flush() override { out.next(); }
virtual void flush() override { /* out.next(); */}
private:
WriteBuffer & out;
@ -385,12 +387,8 @@ AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
, settings(settings_)
, params(params_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
, file_writer(
std::make_unique<OutputStreamWriteBufferAdapter>(out_),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec))
{
}
@ -398,19 +396,71 @@ AvroRowOutputFormat::~AvroRowOutputFormat() = default;
void AvroRowOutputFormat::writePrefix()
{
file_writer.syncIfNeeded();
file_writer_ptr = std::make_unique<avro::DataFileWriterBase>(
std::make_unique<OutputStreamWriteBufferAdapter>(out),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec));
file_writer_ptr->syncIfNeeded();
}
void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
{
file_writer.syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer.encoder());
file_writer.incr();
file_writer_ptr->syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer_ptr->encoder());
file_writer_ptr->incr();
}
void AvroRowOutputFormat::writeSuffix()
{
file_writer.close();
file_writer_ptr.reset();
}
void AvroRowOutputFormat::consume(DB::Chunk chunk)
{
LOG_TRACE(&Poco::Logger::get("AvroBlockOutputFormat"), "top of consume");
if (params.callback)
consumeImplCallback(std::move(chunk));
else
consumeImpl(std::move(chunk));
}
void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
writePrefix();
for (size_t row = 0; row < num_rows; ++row)
write(columns, row);
first_row = false;
}
void AvroRowOutputFormat::consumeImplCallback(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows;)
{
writePrefix();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
{
write(columns, row);
}
file_writer_ptr->flush();
writeSuffix();
params.callback(columns, num_rows);
first_row = false;
}
}
void registerOutputFormatProcessorAvro(FormatFactory & factory)

View File

@ -49,6 +49,7 @@ public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
@ -57,8 +58,13 @@ public:
private:
FormatSettings settings;
Params params;
AvroSerializer serializer;
avro::DataFileWriterBase file_writer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;
void consumeImpl(Chunk);
void consumeImplCallback(Chunk);
};
}

View File

@ -3,6 +3,8 @@
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <base/logger_useful.h>
namespace DB
{
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
@ -54,13 +56,14 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "top of countRow");
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
// if last character of last chunk is delimiter - we don't need it
if (delim && last_chunk[last_chunk_size - 1] == delim)
if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;
@ -76,6 +79,8 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
cppkafka::MessageBuilder builder(topic);
builder.payload(payload);
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "payload size {}", payload.size());
// Note: if it will be few rows per message - it will take the value from last row of block
if (key_column_index)
{
@ -116,6 +121,7 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
void WriteBufferToKafkaProducer::flush()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "flush");
// For unknown reason we may hit some internal timeout when inserting for the first time.
while (true)
{
@ -136,11 +142,13 @@ void WriteBufferToKafkaProducer::flush()
void WriteBufferToKafkaProducer::nextImpl()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "nextImpl");
addChunk();
}
void WriteBufferToKafkaProducer::addChunk()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "addChunk");
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
@ -148,6 +156,7 @@ void WriteBufferToKafkaProducer::addChunk()
void WriteBufferToKafkaProducer::reinitializeChunks()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "reinitializeChunks");
rows = 0;
chunks.clear();
/// We cannot leave the buffer in the undefined state (i.e. without any

View File

@ -8,6 +8,7 @@ import logging
import io
import string
import ast
import math
import avro.schema
import avro.io
@ -1829,6 +1830,52 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
kafka_delete_topic(admin_client, topic_name)
def test_kafka_produce_consume_avro(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_name = "insert_avro"
kafka_create_topic(admin_client, topic_name)
num_rows = 75
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka_writer;
CREATE TABLE test.kafka_writer (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro',
kafka_group_name = 'avro',
kafka_format = 'Avro';
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro',
kafka_group_name = 'avro',
kafka_format = 'Avro';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value FROM test.kafka;
''')
instance.query("INSERT INTO test.kafka_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(num_rows=num_rows))
instance.wait_for_log_line("Committed offset {offset}".format(offset=math.ceil(num_rows/7)))
expected_num_rows = instance.query("SELECT COUNT(1) FROM test.view", ignore_error=True)
assert (int(expected_num_rows) == num_rows)
expected_max_key = instance.query("SELECT max(key) FROM test.view", ignore_error=True)
assert (int(expected_max_key) == (num_rows - 1) * 10)
kafka_delete_topic(admin_client, topic_name)
def test_kafka_flush_by_time(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_name = "flush_by_time"