Merge pull request #30351 from arenadata/ADQM-335

output_format_avro_rows_in_file
This commit is contained in:
Kruglov Pavel 2021-11-02 12:25:27 +03:00 committed by GitHub
commit 901ebcede6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 237 additions and 45 deletions

View File

@ -577,10 +577,8 @@ class IColumn;
M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \
M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
\
M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\
M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \
\
M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \
@ -595,6 +593,8 @@ class IColumn;
M(UInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \
M(UInt64, output_format_avro_rows_in_file, 1000000, "Max rows in a file (if permitted by storage)", 0) \
M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \
M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \
M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \

View File

@ -52,6 +52,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString();
format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString();
format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line;

View File

@ -66,6 +66,7 @@ struct FormatSettings
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
String string_column_pattern;
UInt64 output_rows_in_file = 1000000;
} avro;
struct CSV

View File

@ -23,9 +23,18 @@ class WriteBuffer;
*/
class IRowOutputFormat : public IOutputFormat
{
public:
using Params = RowOutputFormatParams;
private:
bool prefix_written = false;
bool suffix_written = false;
protected:
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
void consume(Chunk chunk) override;
@ -33,9 +42,23 @@ protected:
void consumeExtremes(Chunk chunk) override;
void finalize() override;
public:
using Params = RowOutputFormatParams;
void writePrefixIfNot()
{
if (!prefix_written)
writePrefix();
prefix_written = true;
}
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
public:
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
/** Write a row.
@ -63,28 +86,6 @@ public:
virtual void writeAfterExtremes() {}
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
private:
bool prefix_written = false;
bool suffix_written = false;
Params params;
void writePrefixIfNot()
{
if (!prefix_written)
writePrefix();
prefix_written = true;
}
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
};
}

View File

@ -93,7 +93,7 @@ public:
virtual void backup(size_t len) override { out.position() -= len; }
virtual uint64_t byteCount() const override { return out.count(); }
virtual void flush() override { out.next(); }
virtual void flush() override { }
private:
WriteBuffer & out;
@ -386,11 +386,6 @@ AvroRowOutputFormat::AvroRowOutputFormat(
: IRowOutputFormat(header_, out_, params_)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
, file_writer(
std::make_unique<OutputStreamWriteBufferAdapter>(out_),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec))
{
}
@ -398,19 +393,73 @@ AvroRowOutputFormat::~AvroRowOutputFormat() = default;
void AvroRowOutputFormat::writePrefix()
{
file_writer.syncIfNeeded();
// we have to recreate avro::DataFileWriterBase object due to its interface limitations
file_writer_ptr = std::make_unique<avro::DataFileWriterBase>(
std::make_unique<OutputStreamWriteBufferAdapter>(out),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec));
file_writer_ptr->syncIfNeeded();
}
void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
{
file_writer.syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer.encoder());
file_writer.incr();
file_writer_ptr->syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer_ptr->encoder());
file_writer_ptr->incr();
}
void AvroRowOutputFormat::writeSuffix()
{
file_writer.close();
file_writer_ptr.reset();
}
void AvroRowOutputFormat::consume(DB::Chunk chunk)
{
if (params.callback)
consumeImplWithCallback(std::move(chunk));
else
consumeImpl(std::move(chunk));
}
void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
writePrefixIfNot();
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
}
}
void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
for (size_t row = 0; row < num_rows;)
{
size_t current_row = row;
/// used by WriteBufferToKafkaProducer to obtain auxiliary data
/// from the starting row of a file
writePrefix();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
{
write(columns, row);
}
file_writer_ptr->flush();
writeSuffix();
params.callback(columns, current_row);
}
}
void registerOutputFormatAvro(FormatFactory & factory)

View File

@ -49,6 +49,7 @@ public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
@ -58,7 +59,11 @@ public:
private:
FormatSettings settings;
AvroSerializer serializer;
avro::DataFileWriterBase file_writer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;
void consumeImpl(Chunk);
void consumeImplWithCallback(Chunk);
};
}

View File

@ -53,14 +53,13 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
{
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
// if last character of last chunk is delimiter - we don't need it
if (delim && last_chunk[last_chunk_size - 1] == delim)
if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;

View File

@ -89,7 +89,7 @@ void WriteBufferToRabbitMQProducer::countRow()
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
if (delim && last_chunk[last_chunk_size - 1] == delim)
if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;

View File

@ -8,6 +8,7 @@ import logging
import io
import string
import ast
import math
import avro.schema
import avro.io
@ -119,17 +120,20 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
def kafka_producer_send_heartbeat_msg(max_retries=50):
kafka_produce(kafka_cluster, 'test_heartbeat_topic', ['test'], retries=max_retries)
def kafka_consume(kafka_cluster, topic):
def kafka_consume(kafka_cluster, topic, needDecode = True, timestamp = 0):
consumer = KafkaConsumer(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port), auto_offset_reset="earliest")
consumer.subscribe(topics=(topic))
for toppar, messages in list(consumer.poll(5000).items()):
if toppar.topic == topic:
for message in messages:
yield message.value.decode()
assert timestamp == 0 or message.timestamp / 1000 == timestamp
if needDecode:
yield message.value.decode()
else:
yield message.value
consumer.unsubscribe()
consumer.close()
def kafka_produce_protobuf_messages(kafka_cluster, topic, start_index, num_messages):
data = b''
for i in range(start_index, start_index + num_messages):
@ -681,6 +685,16 @@ def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference'
return TSV(result) == TSV(reference)
def decode_avro(message):
b = io.BytesIO(message)
ret = avro.datafile.DataFileReader(b, avro.io.DatumReader())
output = io.StringIO()
for record in ret:
print(record, file=output)
return output.getvalue()
# https://stackoverflow.com/a/57692111/1555175
def describe_consumer_group(kafka_cluster, name):
client = BrokerConnection('localhost', kafka_cluster.kafka_port, socket.AF_INET)
@ -1829,6 +1843,86 @@ def test_kafka_produce_key_timestamp(kafka_cluster):
kafka_delete_topic(admin_client, topic_name)
def test_kafka_insert_avro(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64, _timestamp DateTime('UTC'))
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro1',
kafka_group_name = 'avro1',
kafka_format = 'Avro';
''')
instance.query("INSERT INTO test.kafka select number*10 as key, number*100 as value, 1636505534 as _timestamp from numbers(4) SETTINGS output_format_avro_rows_in_file = 2, output_format_avro_codec = 'deflate'")
messages = []
while True:
messages.extend(kafka_consume(kafka_cluster, 'avro1', needDecode = False, timestamp = 1636505534))
if len(messages) == 2:
break
result = ''
for a_message in messages:
result += decode_avro(a_message) + '\n'
expected_result = """{'key': 0, 'value': 0, '_timestamp': 1636505534}
{'key': 10, 'value': 100, '_timestamp': 1636505534}
{'key': 20, 'value': 200, '_timestamp': 1636505534}
{'key': 30, 'value': 300, '_timestamp': 1636505534}
"""
assert (result == expected_result)
def test_kafka_produce_consume_avro(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_name = "insert_avro"
kafka_create_topic(admin_client, topic_name)
num_rows = 75
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka_writer;
CREATE TABLE test.kafka_writer (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro',
kafka_group_name = 'avro',
kafka_format = 'Avro';
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'avro',
kafka_group_name = 'avro',
kafka_format = 'Avro';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value FROM test.kafka;
''')
instance.query("INSERT INTO test.kafka_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(num_rows=num_rows))
instance.wait_for_log_line("Committed offset {offset}".format(offset=math.ceil(num_rows/7)))
expected_num_rows = instance.query("SELECT COUNT(1) FROM test.view", ignore_error=True)
assert (int(expected_num_rows) == num_rows)
expected_max_key = instance.query("SELECT max(key) FROM test.view", ignore_error=True)
assert (int(expected_max_key) == (num_rows - 1) * 10)
kafka_delete_topic(admin_client, topic_name)
def test_kafka_flush_by_time(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_name = "flush_by_time"

View File

@ -6,6 +6,7 @@ import threading
import logging
import time
from random import randrange
import math
import pika
import pytest
@ -250,7 +251,7 @@ def test_rabbitmq_macros(rabbitmq_cluster):
for i in range(50):
message += json.dumps({'key': i, 'value': i}) + '\n'
channel.basic_publish(exchange='macro', routing_key='', body=message)
connection.close()
time.sleep(1)
@ -2027,6 +2028,47 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster):
instance.query('DROP TABLE test.rabbitmq_queue')
def test_rabbitmq_produce_consume_avro(rabbitmq_cluster):
num_rows = 75
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.rabbit;
DROP TABLE IF EXISTS test.rabbit_writer;
CREATE TABLE test.rabbit_writer (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
CREATE TABLE test.rabbit (key UInt64, value UInt64)
ENGINE = RabbitMQ
SETTINGS rabbitmq_host_port = 'rabbitmq1:5672',
rabbitmq_format = 'Avro',
rabbitmq_exchange_name = 'avro',
rabbitmq_exchange_type = 'direct',
rabbitmq_routing_key_list = 'avro';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value FROM test.rabbit;
''')
instance.query("INSERT INTO test.rabbit_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(num_rows=num_rows))
# Ideally we should wait for an event
time.sleep(3)
expected_num_rows = instance.query("SELECT COUNT(1) FROM test.view", ignore_error=True)
assert (int(expected_num_rows) == num_rows)
expected_max_key = instance.query("SELECT max(key) FROM test.view", ignore_error=True)
assert (int(expected_max_key) == (num_rows - 1) * 10)
def test_rabbitmq_bad_args(rabbitmq_cluster):
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)