Few more CR fixes

This commit is contained in:
Mikhail Filimonov 2020-10-12 15:05:40 +02:00
parent 7fb329713a
commit 87a8ba63f7
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
6 changed files with 15 additions and 7 deletions

View File

@ -507,6 +507,7 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING = 541;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE = 542;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL = 543;
extern const int NO_ROW_DELIMITER = 544;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -23,10 +23,11 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
writeRowBetweenDelimiter();
write(columns, row);
first_row = false;
if (params.callback)
params.callback(columns, row);
first_row = false;
}
}

View File

@ -14,7 +14,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int NO_ROW_DELIMITER;
}
@ -36,7 +36,7 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
if (throw_on_multiple_rows_undelimited && !first_row)
{
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::TOO_MANY_ROWS);
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::NO_ROW_DELIMITER);
}
writer.startMessage();

View File

@ -32,7 +32,13 @@ void KafkaBlockOutputStream::writePrefix()
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row){ buffer->countRow(columns, row); }, true);
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
},
/* ignore_no_row_delimiter = */ true
);
}
void KafkaBlockOutputStream::write(const Block & block)

View File

@ -47,7 +47,7 @@ void RabbitMQBlockOutputStream::writePrefix()
{
buffer->countRow();
},
true
/* ignore_no_row_delimiter = */ true
);
}

View File

@ -70,8 +70,8 @@ SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETT
SELECT 'SQUARES->';
SELECT * FROM out_squares_00825 ORDER BY number LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare';
-- Code: 158, e.displayText() = DB::Exception: ProtobufSingle can output only single row at a time.
SELECT * FROM out_persons_00825 ORDER BY name FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'; -- { clientError 158 }
-- Code: 544, e.displayText() = DB::Exception: The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.
SELECT * FROM out_persons_00825 ORDER BY name FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'; -- { clientError 544 }
DROP TABLE IF EXISTS out_persons_00825;
DROP TABLE IF EXISTS out_squares_00825;