From d90302aa3b818f727d4c548af38e0dfd9b207844 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 13 Oct 2021 11:19:37 +0300 Subject: [PATCH 1/6] output_format_avro_rows_in_file --- src/Core/Settings.h | 4 +- src/Formats/FormatFactory.cpp | 1 + src/Formats/FormatSettings.h | 1 + .../Formats/Impl/AvroRowOutputFormat.cpp | 72 ++++++++++++++++--- .../Formats/Impl/AvroRowOutputFormat.h | 8 ++- .../Kafka/WriteBufferToKafkaProducer.cpp | 11 ++- tests/integration/test_storage_kafka/test.py | 47 ++++++++++++ 7 files changed, 129 insertions(+), 15 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f55f10c0267..2b1aa5bf837 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -572,8 +572,6 @@ class IColumn; M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \ M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ - M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \ - \ M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ \ M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \ @@ -590,6 +588,8 @@ class IColumn; M(UInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \ M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \ M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \ + M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \ + M(UInt64, output_format_avro_rows_in_file, 1000000, "Max rows in a file (if permitted by storage)", 0) \ M(Bool, output_format_tsv_crlf_end_of_line, false, "If it is set true, end of line in TSV format will be \\r\\n instead of \\n.", 0) \ M(String, output_format_csv_null_representation, "\\N", "Custom NULL representation in CSV format", 0) \ M(String, output_format_tsv_null_representation, "\\N", "Custom NULL representation in TSV format", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 9901081d7dd..1115933c1ac 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -53,6 +53,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval; format_settings.avro.schema_registry_url = settings.format_avro_schema_registry_url.toString(); format_settings.avro.string_column_pattern = settings.output_format_avro_string_column_pattern.toString(); + format_settings.avro.output_rows_in_file = settings.output_format_avro_rows_in_file; format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes; format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes; format_settings.csv.crlf_end_of_line = settings.output_format_csv_crlf_end_of_line; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 8c894c77e82..fadb95efd50 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -64,6 +64,7 @@ struct FormatSettings UInt64 output_sync_interval = 16 * 1024; bool allow_missing_fields = false; String string_column_pattern; + UInt64 output_rows_in_file = 1000000; } avro; struct CSV diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index 24b231e9ea8..9035c9f8627 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -43,6 +43,8 @@ #include +#include + namespace DB { namespace ErrorCodes @@ -93,7 +95,7 @@ public: virtual void backup(size_t len) override { out.position() -= len; } virtual uint64_t byteCount() const override { return out.count(); } - virtual void flush() override { out.next(); } + virtual void flush() override { /* out.next(); */} private: WriteBuffer & out; @@ -385,12 +387,8 @@ AvroRowOutputFormat::AvroRowOutputFormat( WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_) : IRowOutputFormat(header_, out_, params_) , settings(settings_) + , params(params_) , serializer(header_.getColumnsWithTypeAndName(), std::make_unique(settings)) - , file_writer( - std::make_unique(out_), - serializer.getSchema(), - settings.avro.output_sync_interval, - getCodec(settings.avro.output_codec)) { } @@ -398,19 +396,71 @@ AvroRowOutputFormat::~AvroRowOutputFormat() = default; void AvroRowOutputFormat::writePrefix() { - file_writer.syncIfNeeded(); + file_writer_ptr = std::make_unique( + std::make_unique(out), + serializer.getSchema(), + settings.avro.output_sync_interval, + getCodec(settings.avro.output_codec)); + + file_writer_ptr->syncIfNeeded(); } void AvroRowOutputFormat::write(const Columns & columns, size_t row_num) { - file_writer.syncIfNeeded(); - serializer.serializeRow(columns, row_num, file_writer.encoder()); - file_writer.incr(); + file_writer_ptr->syncIfNeeded(); + serializer.serializeRow(columns, row_num, file_writer_ptr->encoder()); + file_writer_ptr->incr(); } void AvroRowOutputFormat::writeSuffix() { - file_writer.close(); + file_writer_ptr.reset(); +} + +void AvroRowOutputFormat::consume(DB::Chunk chunk) +{ + LOG_TRACE(&Poco::Logger::get("AvroBlockOutputFormat"), "top of consume"); + + if (params.callback) + consumeImplCallback(std::move(chunk)); + else + consumeImpl(std::move(chunk)); +} + +void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk) +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + writePrefix(); + for (size_t row = 0; row < num_rows; ++row) + write(columns, row); + + first_row = false; +} + +void AvroRowOutputFormat::consumeImplCallback(DB::Chunk chunk) +{ + auto num_rows = chunk.getNumRows(); + const auto & columns = chunk.getColumns(); + + for (size_t row = 0; row < num_rows;) + { + writePrefix(); + for (size_t row_in_file = 0; + row_in_file < settings.avro.output_rows_in_file && row < num_rows; + ++row, ++row_in_file) + { + write(columns, row); + } + + + file_writer_ptr->flush(); + writeSuffix(); + + params.callback(columns, num_rows); + first_row = false; + } } void registerOutputFormatProcessorAvro(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/src/Processors/Formats/Impl/AvroRowOutputFormat.h index c807736071e..af4125d831a 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -49,6 +49,7 @@ public: AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_); virtual ~AvroRowOutputFormat() override; + void consume(Chunk) override; String getName() const override { return "AvroRowOutputFormat"; } void write(const Columns & columns, size_t row_num) override; void writeField(const IColumn &, const ISerialization &, size_t) override {} @@ -57,8 +58,13 @@ public: private: FormatSettings settings; + Params params; AvroSerializer serializer; - avro::DataFileWriterBase file_writer; + std::unique_ptr file_writer_ptr; + + void consumeImpl(Chunk); + void consumeImplCallback(Chunk); + }; } diff --git a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp index 34ab48e501d..d574c32b6e0 100644 --- a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp +++ b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp @@ -3,6 +3,8 @@ #include "Columns/ColumnString.h" #include "Columns/ColumnsNumber.h" +#include + namespace DB { WriteBufferToKafkaProducer::WriteBufferToKafkaProducer( @@ -54,13 +56,14 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer() void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row) { + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "top of countRow"); if (++rows % max_rows == 0) { const std::string & last_chunk = chunks.back(); size_t last_chunk_size = offset(); // if last character of last chunk is delimiter - we don't need it - if (delim && last_chunk[last_chunk_size - 1] == delim) + if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim) --last_chunk_size; std::string payload; @@ -76,6 +79,8 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren cppkafka::MessageBuilder builder(topic); builder.payload(payload); + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "payload size {}", payload.size()); + // Note: if it will be few rows per message - it will take the value from last row of block if (key_column_index) { @@ -116,6 +121,7 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren void WriteBufferToKafkaProducer::flush() { + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "flush"); // For unknown reason we may hit some internal timeout when inserting for the first time. while (true) { @@ -136,11 +142,13 @@ void WriteBufferToKafkaProducer::flush() void WriteBufferToKafkaProducer::nextImpl() { + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "nextImpl"); addChunk(); } void WriteBufferToKafkaProducer::addChunk() { + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "addChunk"); chunks.push_back(std::string()); chunks.back().resize(chunk_size); set(chunks.back().data(), chunk_size); @@ -148,6 +156,7 @@ void WriteBufferToKafkaProducer::addChunk() void WriteBufferToKafkaProducer::reinitializeChunks() { + LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "reinitializeChunks"); rows = 0; chunks.clear(); /// We cannot leave the buffer in the undefined state (i.e. without any diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 6106966e5b7..e01f4bf146e 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -8,6 +8,7 @@ import logging import io import string import ast +import math import avro.schema import avro.io @@ -1829,6 +1830,52 @@ def test_kafka_produce_key_timestamp(kafka_cluster): kafka_delete_topic(admin_client, topic_name) +def test_kafka_produce_consume_avro(kafka_cluster): + + admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) + + topic_name = "insert_avro" + kafka_create_topic(admin_client, topic_name) + + num_rows = 75 + + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.kafka; + DROP TABLE IF EXISTS test.kafka_writer; + + CREATE TABLE test.kafka_writer (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'avro', + kafka_group_name = 'avro', + kafka_format = 'Avro'; + + + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'avro', + kafka_group_name = 'avro', + kafka_format = 'Avro'; + + CREATE MATERIALIZED VIEW test.view Engine=Log AS + SELECT key, value FROM test.kafka; + ''') + + instance.query("INSERT INTO test.kafka_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(num_rows=num_rows)) + + instance.wait_for_log_line("Committed offset {offset}".format(offset=math.ceil(num_rows/7))) + + expected_num_rows = instance.query("SELECT COUNT(1) FROM test.view", ignore_error=True) + assert (int(expected_num_rows) == num_rows) + + expected_max_key = instance.query("SELECT max(key) FROM test.view", ignore_error=True) + assert (int(expected_max_key) == (num_rows - 1) * 10) + + kafka_delete_topic(admin_client, topic_name) + + def test_kafka_flush_by_time(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)) topic_name = "flush_by_time" From 9f9f4a561cc7ee20f419ba7b390ddac77b98f499 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Tue, 19 Oct 2021 23:26:55 +0300 Subject: [PATCH 2/6] output_format_avro_rows_in_file - fixes --- src/Core/Settings.h | 2 +- src/Processors/Formats/IRowOutputFormat.h | 38 +++++++++---------- .../Formats/Impl/AvroRowOutputFormat.cpp | 17 ++++----- .../Formats/Impl/AvroRowOutputFormat.h | 2 +- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2b1aa5bf837..5785769fed9 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -572,8 +572,8 @@ class IColumn; M(Bool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(Bool, input_format_avro_allow_missing_fields, false, "For Avro/AvroConfluent format: when field is not found in schema use default value instead of error", 0) \ M(URI, format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ - M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ \ + M(Bool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ M(Bool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.", 0) \ \ M(Bool, output_format_json_escape_forward_slashes, true, "Controls escaping forward slashes for string outputs in JSON output format. This is intended for compatibility with JavaScript. Don't confuse with backslashes that are always escaped.", 0) \ diff --git a/src/Processors/Formats/IRowOutputFormat.h b/src/Processors/Formats/IRowOutputFormat.h index c35d93b6133..cb9021d9e95 100644 --- a/src/Processors/Formats/IRowOutputFormat.h +++ b/src/Processors/Formats/IRowOutputFormat.h @@ -33,6 +33,25 @@ protected: void consumeExtremes(Chunk chunk) override; void finalize() override; + bool prefix_written = false; + bool suffix_written = false; + + void writePrefixIfNot() + { + if (!prefix_written) + writePrefix(); + + prefix_written = true; + } + + void writeSuffixIfNot() + { + if (!suffix_written) + writeSuffix(); + + suffix_written = true; + } + public: using Params = RowOutputFormatParams; @@ -64,27 +83,8 @@ public: virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes. private: - bool prefix_written = false; - bool suffix_written = false; - Params params; - void writePrefixIfNot() - { - if (!prefix_written) - writePrefix(); - - prefix_written = true; - } - - void writeSuffixIfNot() - { - if (!suffix_written) - writeSuffix(); - - suffix_written = true; - } - }; } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index 9035c9f8627..73b8b561a00 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -43,8 +43,6 @@ #include -#include - namespace DB { namespace ErrorCodes @@ -95,7 +93,7 @@ public: virtual void backup(size_t len) override { out.position() -= len; } virtual uint64_t byteCount() const override { return out.count(); } - virtual void flush() override { /* out.next(); */} + virtual void flush() override { } private: WriteBuffer & out; @@ -396,6 +394,7 @@ AvroRowOutputFormat::~AvroRowOutputFormat() = default; void AvroRowOutputFormat::writePrefix() { + // we have to recreate avro::DataFileWriterBase object due to its interface limitations file_writer_ptr = std::make_unique( std::make_unique(out), serializer.getSchema(), @@ -419,10 +418,8 @@ void AvroRowOutputFormat::writeSuffix() void AvroRowOutputFormat::consume(DB::Chunk chunk) { - LOG_TRACE(&Poco::Logger::get("AvroBlockOutputFormat"), "top of consume"); - if (params.callback) - consumeImplCallback(std::move(chunk)); + consumeImplWithCallback(std::move(chunk)); else consumeImpl(std::move(chunk)); } @@ -432,14 +429,16 @@ void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk) auto num_rows = chunk.getNumRows(); const auto & columns = chunk.getColumns(); - writePrefix(); + writePrefixIfNot(); for (size_t row = 0; row < num_rows; ++row) + { write(columns, row); + first_row = false; + } - first_row = false; } -void AvroRowOutputFormat::consumeImplCallback(DB::Chunk chunk) +void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) { auto num_rows = chunk.getNumRows(); const auto & columns = chunk.getColumns(); diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/src/Processors/Formats/Impl/AvroRowOutputFormat.h index af4125d831a..7515946df5d 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -63,7 +63,7 @@ private: std::unique_ptr file_writer_ptr; void consumeImpl(Chunk); - void consumeImplCallback(Chunk); + void consumeImplWithCallback(Chunk); }; From c8e5a67c0dcbd5c95b9ff33488e25482dd29a279 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 20 Oct 2021 00:40:14 +0300 Subject: [PATCH 3/6] output_format_avro_rows_in_file tested against RabbitMQ --- .../integration/test_storage_rabbitmq/test.py | 44 ++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 36d63588386..1c27c95d2ab 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -6,6 +6,7 @@ import threading import logging import time from random import randrange +import math import pika import pytest @@ -250,7 +251,7 @@ def test_rabbitmq_macros(rabbitmq_cluster): for i in range(50): message += json.dumps({'key': i, 'value': i}) + '\n' channel.basic_publish(exchange='macro', routing_key='', body=message) - + connection.close() time.sleep(1) @@ -2027,6 +2028,47 @@ def test_rabbitmq_queue_consume(rabbitmq_cluster): instance.query('DROP TABLE test.rabbitmq_queue') +def test_rabbitmq_produce_consume_avro(rabbitmq_cluster): + num_rows = 75 + + instance.query(''' + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.rabbit; + DROP TABLE IF EXISTS test.rabbit_writer; + + CREATE TABLE test.rabbit_writer (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_format = 'Avro', + rabbitmq_exchange_name = 'avro', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'avro'; + + CREATE TABLE test.rabbit (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = 'rabbitmq1:5672', + rabbitmq_format = 'Avro', + rabbitmq_exchange_name = 'avro', + rabbitmq_exchange_type = 'direct', + rabbitmq_routing_key_list = 'avro'; + + CREATE MATERIALIZED VIEW test.view Engine=Log AS + SELECT key, value FROM test.rabbit; + ''') + + instance.query("INSERT INTO test.rabbit_writer select number*10 as key, number*100 as value from numbers({num_rows}) SETTINGS output_format_avro_rows_in_file = 7".format(num_rows=num_rows)) + + + # Ideally we should wait for an event + time.sleep(3) + + expected_num_rows = instance.query("SELECT COUNT(1) FROM test.view", ignore_error=True) + assert (int(expected_num_rows) == num_rows) + + expected_max_key = instance.query("SELECT max(key) FROM test.view", ignore_error=True) + assert (int(expected_max_key) == (num_rows - 1) * 10) + + def test_rabbitmq_bad_args(rabbitmq_cluster): credentials = pika.PlainCredentials('root', 'clickhouse') parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials) From 82f33151e764a91f970c1deec777f783bc3f07d2 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Wed, 20 Oct 2021 23:47:50 +0300 Subject: [PATCH 4/6] output_format_avro_rows_in_file fixes per code review --- src/Formats/FormatSettings.h | 2 +- src/Processors/Formats/IRowOutputFormat.h | 17 +++++++++-------- .../Formats/Impl/AvroRowOutputFormat.cpp | 3 --- .../Formats/Impl/AvroRowOutputFormat.h | 1 - .../Kafka/WriteBufferToKafkaProducer.cpp | 10 ---------- .../RabbitMQ/WriteBufferToRabbitMQProducer.cpp | 2 +- 6 files changed, 11 insertions(+), 24 deletions(-) diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index fadb95efd50..d39c1ba05fa 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -64,7 +64,7 @@ struct FormatSettings UInt64 output_sync_interval = 16 * 1024; bool allow_missing_fields = false; String string_column_pattern; - UInt64 output_rows_in_file = 1000000; + UInt64 output_rows_in_file = 1; } avro; struct CSV diff --git a/src/Processors/Formats/IRowOutputFormat.h b/src/Processors/Formats/IRowOutputFormat.h index cb9021d9e95..18575419cd0 100644 --- a/src/Processors/Formats/IRowOutputFormat.h +++ b/src/Processors/Formats/IRowOutputFormat.h @@ -23,9 +23,18 @@ class WriteBuffer; */ class IRowOutputFormat : public IOutputFormat { +public: + using Params = RowOutputFormatParams; + +private: + bool prefix_written = false; + bool suffix_written = false; + protected: DataTypes types; Serializations serializations; + Params params; + bool first_row = true; void consume(Chunk chunk) override; @@ -33,9 +42,6 @@ protected: void consumeExtremes(Chunk chunk) override; void finalize() override; - bool prefix_written = false; - bool suffix_written = false; - void writePrefixIfNot() { if (!prefix_written) @@ -53,8 +59,6 @@ protected: } public: - using Params = RowOutputFormatParams; - IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_); /** Write a row. @@ -82,9 +86,6 @@ public: virtual void writeAfterExtremes() {} virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes. -private: - Params params; - }; } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index 73b8b561a00..e5845003ca2 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -385,7 +385,6 @@ AvroRowOutputFormat::AvroRowOutputFormat( WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_) : IRowOutputFormat(header_, out_, params_) , settings(settings_) - , params(params_) , serializer(header_.getColumnsWithTypeAndName(), std::make_unique(settings)) { } @@ -433,7 +432,6 @@ void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk) for (size_t row = 0; row < num_rows; ++row) { write(columns, row); - first_row = false; } } @@ -458,7 +456,6 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) writeSuffix(); params.callback(columns, num_rows); - first_row = false; } } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/src/Processors/Formats/Impl/AvroRowOutputFormat.h index 7515946df5d..a3e8493f757 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -58,7 +58,6 @@ public: private: FormatSettings settings; - Params params; AvroSerializer serializer; std::unique_ptr file_writer_ptr; diff --git a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp index d574c32b6e0..7b736e95d25 100644 --- a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp +++ b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp @@ -3,8 +3,6 @@ #include "Columns/ColumnString.h" #include "Columns/ColumnsNumber.h" -#include - namespace DB { WriteBufferToKafkaProducer::WriteBufferToKafkaProducer( @@ -55,8 +53,6 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer() void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row) { - - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "top of countRow"); if (++rows % max_rows == 0) { const std::string & last_chunk = chunks.back(); @@ -79,8 +75,6 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren cppkafka::MessageBuilder builder(topic); builder.payload(payload); - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "payload size {}", payload.size()); - // Note: if it will be few rows per message - it will take the value from last row of block if (key_column_index) { @@ -121,7 +115,6 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren void WriteBufferToKafkaProducer::flush() { - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "flush"); // For unknown reason we may hit some internal timeout when inserting for the first time. while (true) { @@ -142,13 +135,11 @@ void WriteBufferToKafkaProducer::flush() void WriteBufferToKafkaProducer::nextImpl() { - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "nextImpl"); addChunk(); } void WriteBufferToKafkaProducer::addChunk() { - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "addChunk"); chunks.push_back(std::string()); chunks.back().resize(chunk_size); set(chunks.back().data(), chunk_size); @@ -156,7 +147,6 @@ void WriteBufferToKafkaProducer::addChunk() void WriteBufferToKafkaProducer::reinitializeChunks() { - LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "reinitializeChunks"); rows = 0; chunks.clear(); /// We cannot leave the buffer in the undefined state (i.e. without any diff --git a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp index 1929a103414..5dce82e3a2e 100644 --- a/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp +++ b/src/Storages/RabbitMQ/WriteBufferToRabbitMQProducer.cpp @@ -88,7 +88,7 @@ void WriteBufferToRabbitMQProducer::countRow() const std::string & last_chunk = chunks.back(); size_t last_chunk_size = offset(); - if (delim && last_chunk[last_chunk_size - 1] == delim) + if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim) --last_chunk_size; std::string payload; From 551a1065c195acd18f319ce4ce5b3c52c2e763aa Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Thu, 21 Oct 2021 14:19:25 +0300 Subject: [PATCH 5/6] output_format_avro_rows_in_file default is 1000000 --- src/Formats/FormatSettings.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index d39c1ba05fa..fadb95efd50 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -64,7 +64,7 @@ struct FormatSettings UInt64 output_sync_interval = 16 * 1024; bool allow_missing_fields = false; String string_column_pattern; - UInt64 output_rows_in_file = 1; + UInt64 output_rows_in_file = 1000000; } avro; struct CSV From 8efa1743cf6f56845195488b1f2094717c99b33e Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 29 Oct 2021 16:47:49 +0300 Subject: [PATCH 6/6] output_format_avro_rows_in_file fix and test for _timestamp --- .../Formats/Impl/AvroRowOutputFormat.cpp | 7 ++- tests/integration/test_storage_kafka/test.py | 53 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index e5845003ca2..7838da157b1 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -443,6 +443,10 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) for (size_t row = 0; row < num_rows;) { + size_t current_row = row; + /// used by WriteBufferToKafkaProducer to obtain auxiliary data + /// from the starting row of a file + writePrefix(); for (size_t row_in_file = 0; row_in_file < settings.avro.output_rows_in_file && row < num_rows; @@ -451,11 +455,10 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) write(columns, row); } - file_writer_ptr->flush(); writeSuffix(); - params.callback(columns, num_rows); + params.callback(columns, current_row); } } diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index e01f4bf146e..c2b61b26ba1 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -120,17 +120,20 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15): def kafka_producer_send_heartbeat_msg(max_retries=50): kafka_produce(kafka_cluster, 'test_heartbeat_topic', ['test'], retries=max_retries) -def kafka_consume(kafka_cluster, topic): +def kafka_consume(kafka_cluster, topic, needDecode = True, timestamp = 0): consumer = KafkaConsumer(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port), auto_offset_reset="earliest") consumer.subscribe(topics=(topic)) for toppar, messages in list(consumer.poll(5000).items()): if toppar.topic == topic: for message in messages: - yield message.value.decode() + assert timestamp == 0 or message.timestamp / 1000 == timestamp + if needDecode: + yield message.value.decode() + else: + yield message.value consumer.unsubscribe() consumer.close() - def kafka_produce_protobuf_messages(kafka_cluster, topic, start_index, num_messages): data = b'' for i in range(start_index, start_index + num_messages): @@ -682,6 +685,16 @@ def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference' return TSV(result) == TSV(reference) +def decode_avro(message): + b = io.BytesIO(message) + ret = avro.datafile.DataFileReader(b, avro.io.DatumReader()) + + output = io.StringIO() + for record in ret: + print(record, file=output) + return output.getvalue() + + # https://stackoverflow.com/a/57692111/1555175 def describe_consumer_group(kafka_cluster, name): client = BrokerConnection('localhost', kafka_cluster.kafka_port, socket.AF_INET) @@ -1830,6 +1843,40 @@ def test_kafka_produce_key_timestamp(kafka_cluster): kafka_delete_topic(admin_client, topic_name) +def test_kafka_insert_avro(kafka_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.kafka; + CREATE TABLE test.kafka (key UInt64, value UInt64, _timestamp DateTime('UTC')) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'avro1', + kafka_group_name = 'avro1', + kafka_format = 'Avro'; + ''') + + + instance.query("INSERT INTO test.kafka select number*10 as key, number*100 as value, 1636505534 as _timestamp from numbers(4) SETTINGS output_format_avro_rows_in_file = 2, output_format_avro_codec = 'deflate'") + + messages = [] + while True: + messages.extend(kafka_consume(kafka_cluster, 'avro1', needDecode = False, timestamp = 1636505534)) + if len(messages) == 2: + break + + result = '' + for a_message in messages: + result += decode_avro(a_message) + '\n' + + expected_result = """{'key': 0, 'value': 0, '_timestamp': 1636505534} +{'key': 10, 'value': 100, '_timestamp': 1636505534} + +{'key': 20, 'value': 200, '_timestamp': 1636505534} +{'key': 30, 'value': 300, '_timestamp': 1636505534} + +""" + assert (result == expected_result) + + def test_kafka_produce_consume_avro(kafka_cluster): admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))