output_format_avro_rows_in_file fixes per code review

This commit is contained in:
Ilya Golshtein 2021-10-20 23:47:50 +03:00
parent c8e5a67c0d
commit 82f33151e7
6 changed files with 11 additions and 24 deletions

View File

@ -64,7 +64,7 @@ struct FormatSettings
UInt64 output_sync_interval = 16 * 1024;
bool allow_missing_fields = false;
String string_column_pattern;
UInt64 output_rows_in_file = 1000000;
UInt64 output_rows_in_file = 1;
} avro;
struct CSV

View File

@ -23,9 +23,18 @@ class WriteBuffer;
*/
class IRowOutputFormat : public IOutputFormat
{
public:
using Params = RowOutputFormatParams;
private:
bool prefix_written = false;
bool suffix_written = false;
protected:
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
void consume(Chunk chunk) override;
@ -33,9 +42,6 @@ protected:
void consumeExtremes(Chunk chunk) override;
void finalize() override;
bool prefix_written = false;
bool suffix_written = false;
void writePrefixIfNot()
{
if (!prefix_written)
@ -53,8 +59,6 @@ protected:
}
public:
using Params = RowOutputFormatParams;
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
/** Write a row.
@ -82,9 +86,6 @@ public:
virtual void writeAfterExtremes() {}
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
private:
Params params;
};
}

View File

@ -385,7 +385,6 @@ AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
, settings(settings_)
, params(params_)
, serializer(header_.getColumnsWithTypeAndName(), std::make_unique<AvroSerializerTraits>(settings))
{
}
@ -433,7 +432,6 @@ void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
first_row = false;
}
}
@ -458,7 +456,6 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
writeSuffix();
params.callback(columns, num_rows);
first_row = false;
}
}

View File

@ -58,7 +58,6 @@ public:
private:
FormatSettings settings;
Params params;
AvroSerializer serializer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;

View File

@ -3,8 +3,6 @@
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
#include <base/logger_useful.h>
namespace DB
{
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
@ -55,8 +53,6 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "top of countRow");
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
@ -79,8 +75,6 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
cppkafka::MessageBuilder builder(topic);
builder.payload(payload);
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "payload size {}", payload.size());
// Note: if it will be few rows per message - it will take the value from last row of block
if (key_column_index)
{
@ -121,7 +115,6 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
void WriteBufferToKafkaProducer::flush()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "flush");
// For unknown reason we may hit some internal timeout when inserting for the first time.
while (true)
{
@ -142,13 +135,11 @@ void WriteBufferToKafkaProducer::flush()
void WriteBufferToKafkaProducer::nextImpl()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "nextImpl");
addChunk();
}
void WriteBufferToKafkaProducer::addChunk()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "addChunk");
chunks.push_back(std::string());
chunks.back().resize(chunk_size);
set(chunks.back().data(), chunk_size);
@ -156,7 +147,6 @@ void WriteBufferToKafkaProducer::addChunk()
void WriteBufferToKafkaProducer::reinitializeChunks()
{
LOG_TRACE(&Poco::Logger::get("WriteBufferToKafkaProducer"), "reinitializeChunks");
rows = 0;
chunks.clear();
/// We cannot leave the buffer in the undefined state (i.e. without any

View File

@ -88,7 +88,7 @@ void WriteBufferToRabbitMQProducer::countRow()
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
if (delim && last_chunk[last_chunk_size - 1] == delim)
if (last_chunk_size && delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;