Small refactoring in formats

This commit is contained in:
avogar 2021-11-02 16:40:41 +03:00
parent 12e7c6ec96
commit 2dd4393ca1
73 changed files with 245 additions and 290 deletions

View File

@ -391,7 +391,7 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query)
output_format = global_context->getOutputFormat(
current_format, out_file_buf ? *out_file_buf : *out_buf, block);
output_format->doWritePrefix();
output_format->setAutoFlush();
}
}
@ -671,7 +671,7 @@ void ClientBase::onEndOfStream()
progress_indication.clearProgressOutput();
if (output_format)
output_format->doWriteSuffix();
output_format->finish();
resetOutput();

View File

@ -77,6 +77,8 @@ public:
if (!dynamic_cast<IRowOutputFormat *>(out.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName());
/// Don't write prefix if any.
out->doNotWritePrefix();
out->write(arg_columns);
return col_str;
}

View File

@ -65,11 +65,7 @@ static Chunk prepareTotals(Chunk chunk)
void IOutputFormat::work()
{
if (!prefix_written)
{
doWritePrefix();
prefix_written = true;
}
writePrefixIfNot();
if (finished && !finalized)
{
@ -110,10 +106,17 @@ void IOutputFormat::flush()
void IOutputFormat::write(const Block & block)
{
writePrefixIfNot();
consume(Chunk(block.getColumns(), block.rows()));
if (auto_flush)
flush();
}
void IOutputFormat::finish()
{
writePrefixIfNot();
finalize();
}
}

View File

@ -25,28 +25,6 @@ class IOutputFormat : public IProcessor
public:
enum PortKind { Main = 0, Totals = 1, Extremes = 2 };
protected:
WriteBuffer & out;
Chunk current_chunk;
PortKind current_block_kind = PortKind::Main;
bool has_input = false;
bool finished = false;
bool finalized = false;
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
bool auto_flush = false;
RowsBeforeLimitCounterPtr rows_before_limit_counter;
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalize() {}
public:
IOutputFormat(const Block & header_, WriteBuffer & out_);
Status prepare() override;
@ -77,8 +55,7 @@ public:
void write(const Block & block);
virtual void doWritePrefix() {}
virtual void doWriteSuffix() { finalize(); }
virtual void finish();
virtual bool expectMaterializedColumns() const { return true; }
@ -88,11 +65,43 @@ public:
size_t getResultRows() const { return result_rows; }
size_t getResultBytes() const { return result_bytes; }
void doNotWritePrefix() { need_write_prefix = false; }
protected:
friend class ParallelFormattingOutputFormat;
virtual void consume(Chunk) = 0;
virtual void consumeTotals(Chunk) {}
virtual void consumeExtremes(Chunk) {}
virtual void finalize() {}
virtual void writePrefix() {}
void writePrefixIfNot()
{
if (need_write_prefix)
{
writePrefix();
need_write_prefix = false;
}
}
WriteBuffer & out;
Chunk current_chunk;
PortKind current_block_kind = PortKind::Main;
bool has_input = false;
bool finished = false;
bool finalized = false;
/// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready.
bool auto_flush = false;
bool need_write_prefix = true;
RowsBeforeLimitCounterPtr rows_before_limit_counter;
private:
/// Counters for consumed chunks. Are used for QueryLog.
size_t result_rows = 0;
size_t result_bytes = 0;
bool prefix_written = false;
};
}

View File

@ -22,8 +22,6 @@ IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_, con
void IRowOutputFormat::consume(DB::Chunk chunk)
{
writePrefixIfNot();
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
@ -43,7 +41,6 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
writePrefixIfNot();
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
@ -59,7 +56,6 @@ void IRowOutputFormat::consumeTotals(DB::Chunk chunk)
void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
{
writePrefixIfNot();
writeSuffixIfNot();
auto num_rows = chunk.getNumRows();
@ -76,7 +72,6 @@ void IRowOutputFormat::consumeExtremes(DB::Chunk chunk)
void IRowOutputFormat::finalize()
{
writePrefixIfNot();
writeSuffixIfNot();
writeLastSuffix();
}

View File

@ -26,41 +26,13 @@ class IRowOutputFormat : public IOutputFormat
public:
using Params = RowOutputFormatParams;
private:
bool prefix_written = false;
bool suffix_written = false;
protected:
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
void consumeExtremes(Chunk chunk) override;
void finalize() override;
void writePrefixIfNot()
{
if (!prefix_written)
writePrefix();
prefix_written = true;
}
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
public:
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_);
/** Write a row.
* Default implementation calls methods to write single values and delimiters
* (except delimiter between rows (writeRowBetweenDelimiter())).
@ -78,7 +50,7 @@ public:
virtual void writeRowStartDelimiter() {} /// delimiter before each row
virtual void writeRowEndDelimiter() {} /// delimiter after each row
virtual void writeRowBetweenDelimiter() {} /// delimiter between rows
virtual void writePrefix() {} /// delimiter before resultset
virtual void writePrefix() override {} /// delimiter before resultset
virtual void writeSuffix() {} /// delimiter after resultset
virtual void writeBeforeTotals() {}
virtual void writeAfterTotals() {}
@ -86,6 +58,22 @@ public:
virtual void writeAfterExtremes() {}
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
DataTypes types;
Serializations serializations;
Params params;
bool first_row = true;
private:
void writeSuffixIfNot()
{
if (!suffix_written)
writeSuffix();
suffix_written = true;
}
bool suffix_written = false;
};
}

View File

@ -24,10 +24,9 @@ public:
String getName() const override { return "ArrowBlockInputFormat"; }
protected:
private:
Chunk generate() override;
private:
// Whether to use ArrowStream format
bool stream;
// This field is only used for ArrowStream format

View File

@ -21,19 +21,20 @@ public:
ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_);
String getName() const override { return "ArrowBlockOutputFormat"; }
void consume(Chunk) override;
void finalize() override;
String getContentType() const override { return "application/octet-stream"; }
private:
void consume(Chunk) override;
void finalize() override;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);
bool stream;
const FormatSettings format_settings;
std::shared_ptr<ArrowBufferedOutputStream> arrow_ostream;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
std::unique_ptr<CHColumnToArrowColumn> ch_column_to_arrow_column;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);
};
}

View File

@ -107,12 +107,13 @@ class AvroRowInputFormat : public IRowInputFormat
{
public:
AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
String getName() const override { return "AvroRowInputFormat"; }
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
std::unique_ptr<avro::DataFileReaderBase> file_reader_ptr;
std::unique_ptr<AvroDeserializer> deserializer_ptr;
bool allow_missing_fields;
@ -128,14 +129,16 @@ class AvroConfluentRowInputFormat : public IRowInputFormat
{
public:
AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroConfluentRowInputFormat"; }
class SchemaRegistry;
protected:
private:
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private:
std::shared_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t;
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;

View File

@ -428,7 +428,6 @@ void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk)
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
writePrefixIfNot();
for (size_t row = 0; row < num_rows; ++row)
{
write(columns, row);
@ -447,7 +446,7 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
/// used by WriteBufferToKafkaProducer to obtain auxiliary data
/// from the starting row of a file
writePrefix();
writePrefixIfNot();
for (size_t row_in_file = 0;
row_in_file < settings.avro.output_rows_in_file && row < num_rows;
++row, ++row_in_file)
@ -457,6 +456,7 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk)
file_writer_ptr->flush();
writeSuffix();
need_write_prefix = true;
params.callback(columns, current_row);
}

View File

@ -51,12 +51,13 @@ public:
void consume(Chunk) override;
String getName() const override { return "AvroRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
private:
FormatSettings settings;
AvroSerializer serializer;
std::unique_ptr<avro::DataFileWriterBase> file_writer_ptr;

View File

@ -21,12 +21,12 @@ public:
String getName() const override { return "BinaryRowOutputFormat"; }
String getContentType() const override { return "application/octet-stream"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writePrefix() override;
String getContentType() const override { return "application/octet-stream"; }
protected:
bool with_names;
bool with_types;
};

View File

@ -25,12 +25,13 @@ public:
String getName() const override { return "CSVRowInputFormat"; }
private:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private:
bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override;
bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override;
bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override
{
return *pos != '\n' && *pos != '\r' && *pos != format_settings.csv.delimiter && *pos != ' ' && *pos != '\t';

View File

@ -31,7 +31,7 @@ void CSVRowOutputFormat::writeLine(const std::vector<String> & values)
}
}
void CSVRowOutputFormat::doWritePrefix()
void CSVRowOutputFormat::writePrefix()
{
const auto & sample = getPort(PortKind::Main).getHeader();

View File

@ -24,14 +24,6 @@ public:
String getName() const override { return "CSVRowOutputFormat"; }
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writeBeforeTotals() override;
void writeBeforeExtremes() override;
void doWritePrefix() override;
/// https://www.iana.org/assignments/media-types/text/csv
String getContentType() const override
{
@ -39,6 +31,13 @@ public:
}
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writeBeforeTotals() override;
void writeBeforeExtremes() override;
void writePrefix() override;
void writeLine(const std::vector<String> & values);
bool with_names;

View File

@ -26,9 +26,9 @@ public:
String getName() const override { return "CapnProtoRowInputFormat"; }
private:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
private:
kj::Array<capnp::word> readMessage();
std::shared_ptr<CapnProtoSchemaParser> parser;

View File

@ -35,11 +35,11 @@ public:
String getName() const override { return "CapnProtoRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override { }
private:
Names column_names;
DataTypes column_types;
capnp::StructSchema schema;

View File

@ -18,14 +18,15 @@ class JSONAsStringRowInputFormat : public IRowInputFormat
public:
JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "JSONAsStringRowInputFormat"; }
void resetParser() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
void readSuffix() override;
private:
void readJSONObject(IColumn & column);
PeekableReadBuffer buf;

View File

@ -31,10 +31,10 @@ public:
String getName() const override { return "JSONCompactEachRowRowInputFormat"; }
private:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private:
bool parseRowStartWithDiagnosticInfo(WriteBuffer & out) override;
bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override;
bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override;

View File

@ -81,7 +81,7 @@ void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector<String> & va
writeRowEndDelimiter();
}
void JSONCompactEachRowRowOutputFormat::doWritePrefix()
void JSONCompactEachRowRowOutputFormat::writePrefix()
{
const auto & header = getPort(PortKind::Main).getHeader();

View File

@ -26,7 +26,8 @@ public:
String getName() const override { return "JSONCompactEachRowRowOutputFormat"; }
void doWritePrefix() override;
private:
void writePrefix() override;
void writeTotals(const Columns & columns, size_t row_num) override;
@ -35,12 +36,10 @@ public:
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
protected:
void consumeTotals(Chunk) override;
/// No extremes.
void consumeExtremes(Chunk) override {}
private:
void writeLine(const std::vector<String> & values);
FormatSettings settings;

View File

@ -25,6 +25,7 @@ public:
String getName() const override { return "JSONCompactRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
@ -33,7 +34,6 @@ public:
void writeBeforeTotals() override;
void writeAfterTotals() override;
protected:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override;
void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num) override

View File

@ -28,16 +28,16 @@ public:
bool yield_strings_);
String getName() const override { return "JSONEachRowRowInputFormat"; }
void resetParser() override;
private:
void readPrefix() override;
void readSuffix() override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
const String & columnName(size_t i) const;
size_t columnIndex(const StringRef & name, size_t key_index);
bool advanceToNextKey(size_t key_index);

View File

@ -23,6 +23,7 @@ public:
String getName() const override { return "JSONEachRowRowOutputFormat"; }
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
@ -31,7 +32,6 @@ public:
void writePrefix() override;
void writeSuffix() override;
protected:
/// No totals and extremes.
void consumeTotals(Chunk) override {}
void consumeExtremes(Chunk) override {}

View File

@ -9,11 +9,12 @@ class JSONEachRowWithProgressRowOutputFormat : public JSONEachRowRowOutputFormat
public:
using JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void onProgress(const Progress & value) override;
private:
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
Progress progress;
};

View File

@ -25,6 +25,25 @@ public:
String getName() const override { return "JSONRowOutputFormat"; }
void onProgress(const Progress & value) override;
String getContentType() const override { return "application/json; charset=UTF-8"; }
void flush() override
{
ostr->next();
if (validating_ostr)
out.next();
}
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
}
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
@ -44,25 +63,6 @@ public:
void writeLastSuffix() override;
void flush() override
{
ostr->next();
if (validating_ostr)
out.next();
}
void setRowsBeforeLimit(size_t rows_before_limit_) override
{
applied_limit = true;
rows_before_limit = rows_before_limit_;
}
void onProgress(const Progress & value) override;
String getContentType() const override { return "application/json; charset=UTF-8"; }
protected:
virtual void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num);
virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
virtual void writeTotalsFieldDelimiter() { writeFieldDelimiter(); }
@ -70,7 +70,6 @@ protected:
void writeRowsBeforeLimitAtLeast();
void writeStatistics();
std::unique_ptr<WriteBuffer> validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character.
WriteBuffer * ostr;

View File

@ -17,11 +17,12 @@ class LineAsStringRowInputFormat : public IRowInputFormat
public:
LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "LineAsStringRowInputFormat"; }
void resetParser() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readLineObject(IColumn & column);
};

View File

@ -14,6 +14,9 @@ class MarkdownRowOutputFormat : public IRowOutputFormat
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
String getName() const override { return "MarkdownRowOutputFormat"; }
private:
/// Write higher part of markdown table like this:
/// |columnName1|columnName2|...|columnNameN|
/// |:-:|:-:|...|:-:|
@ -29,9 +32,7 @@ public:
void writeRowEndDelimiter() override ;
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
String getName() const override { return "MarkdownRowOutputFormat"; }
protected:
const FormatSettings format_settings;
};

View File

@ -59,11 +59,12 @@ class MsgPackRowInputFormat : public IRowInputFormat
public:
MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "MagPackRowInputFormat"; }
void resetParser() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readObject();
PeekableReadBuffer buf;

View File

@ -22,11 +22,11 @@ public:
String getName() const override { return "MsgPackRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
void serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num);
private:
msgpack::packer<DB::WriteBuffer> packer;
};

View File

@ -31,13 +31,8 @@ void MySQLOutputFormat::setContext(ContextPtr context_)
context = context_;
}
void MySQLOutputFormat::initialize()
void MySQLOutputFormat::writePrefix()
{
if (initialized)
return;
initialized = true;
const auto & header = getPort(PortKind::Main).getHeader();
data_types = header.getDataTypes();
@ -66,8 +61,6 @@ void MySQLOutputFormat::initialize()
void MySQLOutputFormat::consume(Chunk chunk)
{
initialize();
for (size_t i = 0; i < chunk.getNumRows(); i++)
{
ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), i);

View File

@ -26,15 +26,13 @@ public:
void setContext(ContextPtr context_);
void consume(Chunk) override;
void finalize() override;
void flush() override;
void doWritePrefix() override { initialize(); }
private:
void initialize();
void consume(Chunk) override;
void finalize() override;
void writePrefix() override;
bool initialized = false;
uint32_t client_capabilities = 0;
uint8_t * sequence_id = nullptr;
uint8_t dummy_sequence_id = 0;

View File

@ -64,21 +64,14 @@ void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind port_kind)
void ODBCDriver2BlockOutputFormat::consume(Chunk chunk)
{
writePrefixIfNot();
write(std::move(chunk), PortKind::Main);
}
void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk)
{
writePrefixIfNot();
write(std::move(chunk), PortKind::Totals);
}
void ODBCDriver2BlockOutputFormat::finalize()
{
writePrefixIfNot();
}
void ODBCDriver2BlockOutputFormat::writePrefix()
{
const auto & header = getPort(PortKind::Main).getHeader();

View File

@ -24,30 +24,20 @@ public:
String getName() const override { return "ODBCDriver2BlockOutputFormat"; }
void consume(Chunk) override;
void consumeTotals(Chunk) override;
void finalize() override;
std::string getContentType() const override
{
return "application/octet-stream";
}
private:
void consume(Chunk) override;
void consumeTotals(Chunk) override;
void writePrefix() override;
const FormatSettings format_settings;
bool prefix_written = false;
void writePrefixIfNot()
{
if (!prefix_written)
writePrefix();
prefix_written = true;
}
void writeRow(const Serializations & serializations, const Columns & columns, size_t row_idx, std::string & buffer);
void write(Chunk chunk, PortKind port_kind);
void writePrefix();
};

View File

@ -37,10 +37,11 @@ public:
ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "ORCBlockOutputFormat"; }
private:
void consume(Chunk chunk) override;
void finalize() override;
private:
ORC_UNIQUE_PTR<orc::Type> getORCType(const DataTypePtr & type, const std::string & column_name);
/// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be

View File

@ -171,7 +171,7 @@ namespace DB
{
case ProcessingUnitType::START :
{
formatter->doWritePrefix();
formatter->writePrefix();
break;
}
case ProcessingUnitType::PLAIN :
@ -191,7 +191,7 @@ namespace DB
}
case ProcessingUnitType::FINALIZE :
{
formatter->doWriteSuffix();
formatter->finalize();
break;
}
}

View File

@ -95,7 +95,7 @@ public:
need_flush = true;
}
void doWritePrefix() override
void writePrefix() override
{
addChunk(Chunk{}, ProcessingUnitType::START, /*can_throw_exception*/ true);
}
@ -114,7 +114,7 @@ public:
return internal_formatter_creator(buffer)->getContentType();
}
protected:
private:
void consume(Chunk chunk) override final
{
addChunk(std::move(chunk), ProcessingUnitType::PLAIN, /*can_throw_exception*/ true);
@ -132,7 +132,6 @@ protected:
void finalize() override;
private:
InternalFormatterCreator internal_formatter_creator;
/// Status to synchronize multiple threads.

View File

@ -117,7 +117,7 @@ public:
String getName() const override final { return "ParallelParsingBlockInputFormat"; }
protected:
private:
Chunk generate() override final;
@ -137,8 +137,6 @@ protected:
finishAndWait();
}
private:
class InternalParser
{
public:

View File

@ -23,13 +23,11 @@ public:
String getName() const override { return "ParquetBlockInputFormat"; }
protected:
private:
Chunk generate() override;
private:
void prepareReader();
private:
std::unique_ptr<parquet::arrow::FileReader> file_reader;
int row_group_total = 0;
// indices of columns to read from Parquet file

View File

@ -30,12 +30,13 @@ public:
ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
String getName() const override { return "ParquetBlockOutputFormat"; }
void consume(Chunk) override;
void finalize() override;
String getContentType() const override { return "application/octet-stream"; }
private:
void consume(Chunk) override;
void finalize() override;
const FormatSettings format_settings;
std::unique_ptr<parquet::arrow::FileWriter> file_writer;

View File

@ -11,12 +11,8 @@ PostgreSQLOutputFormat::PostgreSQLOutputFormat(WriteBuffer & out_, const Block &
{
}
void PostgreSQLOutputFormat::doWritePrefix()
void PostgreSQLOutputFormat::writePrefix()
{
if (initialized)
return;
initialized = true;
const auto & header = getPort(PortKind::Main).getHeader();
auto data_types = header.getDataTypes();
@ -37,8 +33,6 @@ void PostgreSQLOutputFormat::doWritePrefix()
void PostgreSQLOutputFormat::consume(Chunk chunk)
{
doWritePrefix();
for (size_t i = 0; i != chunk.getNumRows(); ++i)
{
const Columns & columns = chunk.getColumns();
@ -61,8 +55,6 @@ void PostgreSQLOutputFormat::consume(Chunk chunk)
}
}
void PostgreSQLOutputFormat::finalize() {}
void PostgreSQLOutputFormat::flush()
{
message_transport.flush();

View File

@ -17,13 +17,11 @@ public:
String getName() const override {return "PostgreSQLOutputFormat";}
void doWritePrefix() override;
void consume(Chunk) override;
void finalize() override;
void flush() override;
private:
bool initialized = false;
void writePrefix() override;
void consume(Chunk) override;
FormatSettings format_settings;
PostgreSQLProtocol::Messaging::MessageTransport message_transport;

View File

@ -22,13 +22,13 @@ public:
String getName() const override { return "PrettyBlockOutputFormat"; }
protected:
void consume(Chunk) override;
void consumeTotals(Chunk) override;
void consumeExtremes(Chunk) override;
void finalize() override;
protected:
size_t total_rows = 0;
size_t terminal_width = 0;
bool suffix_written = false;

View File

@ -16,7 +16,7 @@ public:
PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_);
String getName() const override { return "PrettyCompactBlockOutputFormat"; }
protected:
private:
void write(const Chunk & chunk, PortKind port_kind) override;
void writeHeader(const Block & block, const Widths & max_widths, const Widths & name_widths);
void writeBottom(const Widths & max_widths);
@ -28,7 +28,6 @@ protected:
const WidthsPerColumn & widths,
const Widths & max_widths);
private:
bool mono_block;
/// For mono_block == true only
Chunk mono_chunk;

View File

@ -16,7 +16,7 @@ public:
String getName() const override { return "PrettySpaceBlockOutputFormat"; }
protected:
private:
void write(const Chunk & chunk, PortKind port_kind) override;
void writeSuffix() override;
};

View File

@ -32,11 +32,11 @@ public:
String getName() const override { return "ProtobufRowInputFormat"; }
private:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override;
void syncAfterError() override;
private:
std::unique_ptr<ProtobufReader> reader;
std::vector<size_t> missing_column_indices;
std::unique_ptr<ProtobufSerializer> serializer;

View File

@ -39,11 +39,12 @@ public:
String getName() const override { return "ProtobufRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
std::string getContentType() const override { return "application/octet-stream"; }
private:
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const ISerialization &, size_t) override {}
std::unique_ptr<ProtobufWriter> writer;
std::unique_ptr<ProtobufSerializer> serializer;
const bool allow_multiple_rows;

View File

@ -16,8 +16,10 @@ class RawBLOBRowInputFormat : public IRowInputFormat
public:
RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
bool readRow(MutableColumns & columns, RowReadExtension &) override;
String getName() const override { return "RawBLOBRowInputFormat"; }
private:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
};
}

View File

@ -34,6 +34,7 @@ public:
String getName() const override { return "RawBLOBRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization &, size_t row_num) override;
};

View File

@ -30,11 +30,11 @@ public:
RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_);
String getName() const override { return "RegexpRowInputFormat"; }
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void resetParser() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool readField(size_t index, MutableColumns & columns);
void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext);
static ColumnFormat stringToFormat(const String & format);

View File

@ -27,14 +27,14 @@ public:
String getName() const override { return "TSKVRowInputFormat"; }
void resetParser() override;
private:
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
const FormatSettings format_settings;
/// Buffer for the read from the stream the field name. Used when you have to copy it.

View File

@ -18,10 +18,10 @@ public:
String getName() const override { return "TSKVRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeRowEndDelimiter() override;
protected:
NamesAndTypes fields;
size_t field_number = 0;
};

View File

@ -21,12 +21,10 @@ public:
String getName() const override { return "TabSeparatedRowInputFormat"; }
private:
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
private:
bool is_raw;
bool readField(IColumn & column, const DataTypePtr & type,
const SerializationPtr & serialization, bool is_last_file_column, const String & column_name) override;
@ -48,6 +46,8 @@ private:
bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override;
bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override;
bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override { return *pos != '\n' && *pos != '\t'; }
bool is_raw;
};
}

View File

@ -30,7 +30,7 @@ void TabSeparatedRowOutputFormat::writeLine(const std::vector<String> & values)
}
}
void TabSeparatedRowOutputFormat::doWritePrefix()
void TabSeparatedRowOutputFormat::writePrefix()
{
const auto & header = getPort(PortKind::Main).getHeader();

View File

@ -29,24 +29,22 @@ public:
String getName() const override { return "TabSeparatedRowOutputFormat"; }
/// https://www.iana.org/assignments/media-types/text/tab-separated-values
String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; }
protected:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowEndDelimiter() override;
void writeBeforeTotals() override;
void writeBeforeExtremes() override;
void doWritePrefix() override;
/// https://www.iana.org/assignments/media-types/text/tab-separated-values
String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; }
private:
void writePrefix() override;
void writeLine(const std::vector<String> & values);
bool with_names;
bool with_types;
bool is_raw;
protected:
const FormatSettings format_settings;
};

View File

@ -147,8 +147,6 @@ template <typename U, typename V> void TemplateBlockOutputFormat::writeValue(U v
void TemplateBlockOutputFormat::consume(Chunk chunk)
{
doWritePrefix();
size_t rows = chunk.getNumRows();
for (size_t i = 0; i < rows; ++i)
@ -161,13 +159,9 @@ void TemplateBlockOutputFormat::consume(Chunk chunk)
}
}
void TemplateBlockOutputFormat::doWritePrefix()
void TemplateBlockOutputFormat::writePrefix()
{
if (need_write_prefix)
{
writeString(format.delimiters.front(), out);
need_write_prefix = false;
}
writeString(format.delimiters.front(), out);
}
void TemplateBlockOutputFormat::finalize()
@ -175,8 +169,6 @@ void TemplateBlockOutputFormat::finalize()
if (finalized)
return;
doWritePrefix();
size_t parts = format.format_idx_to_column_idx.size();
for (size_t i = 0; i < parts; ++i)

View File

@ -20,8 +20,6 @@ public:
String getName() const override { return "TemplateBlockOutputFormat"; }
void doWritePrefix() override;
void setRowsBeforeLimit(size_t rows_before_limit_) override { rows_before_limit = rows_before_limit_; rows_before_limit_set = true; }
void onProgress(const Progress & progress_) override { progress.incrementPiecewiseAtomically(progress_); }
@ -40,7 +38,8 @@ public:
static ResultsetPart stringToResultsetPart(const String & part);
protected:
private:
void writePrefix() override;
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override { totals = std::move(chunk); }
void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); }
@ -50,7 +49,6 @@ protected:
void serializeField(const IColumn & column, const ISerialization & serialization, size_t row_num, ColumnFormat format);
template <typename U, typename V> void writeValue(U value, ColumnFormat col_format);
protected:
const FormatSettings settings;
Serializations serializations;
@ -65,7 +63,6 @@ protected:
Stopwatch watch;
size_t row_count = 0;
bool need_write_prefix = true;
std::string row_between_delimiter;
};

View File

@ -22,6 +22,9 @@ public:
String getName() const override { return "TemplateRowInputFormat"; }
void resetParser() override;
private:
bool readRow(MutableColumns & columns, RowReadExtension & extra) override;
void readPrefix() override;
@ -29,9 +32,6 @@ public:
bool allowSyncAfterError() const override;
void syncAfterError() override;
void resetParser() override;
private:
bool deserializeField(const DataTypePtr & type,
const SerializationPtr & serialization, IColumn & column, size_t file_column);
@ -51,7 +51,6 @@ private:
void skipToNextDelimiterOrEof(const String & delimiter);
private:
PeekableReadBuffer buf;
const DataTypes data_types;

View File

@ -19,13 +19,13 @@ public:
String getName() const override { return "ValuesRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
private:
const FormatSettings format_settings;
};

View File

@ -22,6 +22,7 @@ public:
String getName() const override { return "VerticalRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeRowStartDelimiter() override;
void writeRowBetweenDelimiter() override;
@ -34,8 +35,7 @@ public:
void writeBeforeTotals() override;
void writeBeforeExtremes() override;
protected:
virtual void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const;
void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const;
/// For totals and extremes.
void writeSpecialRow(const Columns & columns, size_t row_num, const char * title);

View File

@ -20,6 +20,7 @@ public:
String getName() const override { return "XMLRowOutputFormat"; }
private:
void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
@ -54,7 +55,6 @@ public:
String getContentType() const override { return "application/xml; charset=UTF-8"; }
protected:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num);
void writeRowsBeforeLimitAtLeast();
void writeStatistics();

View File

@ -30,8 +30,6 @@ public:
const Params & params_,
bool with_names_, bool with_types_, const FormatSettings & format_settings_);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
void resetParser() override;
protected:
@ -67,6 +65,9 @@ protected:
DataTypes data_types;
private:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
void readPrefix() override;
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override;
void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) override;

View File

@ -1090,7 +1090,6 @@ namespace
write_buffer.emplace(*result.mutable_output());
output_format_processor = query_context->getOutputFormat(output_format, *write_buffer, header);
output_format_processor->doWritePrefix();
Stopwatch after_send_progress;
/// Unless the input() function is used we are not going to receive input data anymore.
@ -1169,7 +1168,7 @@ namespace
executor->execute();
}
output_format_processor->doWriteSuffix();
output_format_processor->finish();
}
void Call::finishQuery()
@ -1380,9 +1379,8 @@ namespace
WriteBufferFromString buf{*result.mutable_totals()};
auto format = query_context->getOutputFormat(output_format, buf, totals);
format->doWritePrefix();
format->write(materializeBlock(totals));
format->doWriteSuffix();
format->finish();
}
void Call::addExtremesToResult(const Block & extremes)
@ -1392,9 +1390,8 @@ namespace
WriteBufferFromString buf{*result.mutable_extremes()};
auto format = query_context->getOutputFormat(output_format, buf, extremes);
format->doWritePrefix();
format->write(materializeBlock(extremes));
format->doWriteSuffix();
format->finish();
}
void Call::addProfileInfoToResult(const ProfileInfo & info)

View File

@ -210,11 +210,6 @@ public:
void consume(Chunk chunk) override
{
if (is_first_chunk)
{
writer->doWritePrefix();
is_first_chunk = false;
}
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
@ -222,7 +217,7 @@ public:
{
try
{
writer->doWriteSuffix();
writer->finish();
writer->flush();
write_buf->sync();
write_buf->finalize();
@ -237,7 +232,6 @@ public:
private:
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool is_first_chunk = true;
};

View File

@ -42,7 +42,7 @@ void KafkaSink::consume(Chunk chunk)
void KafkaSink::onFinish()
{
if (format)
format->doWriteSuffix();
format->finish();
//flush();
if (buffer)

View File

@ -49,7 +49,7 @@ void RabbitMQSink::consume(Chunk chunk)
void RabbitMQSink::onFinish()
{
format->doWriteSuffix();
format->finish();
if (buffer)
buffer->updateMaxWait();

View File

@ -621,26 +621,21 @@ public:
naked_buffer = std::make_unique<WriteBufferFromFile>(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags);
}
/// In case of CSVWithNames we have already written prefix.
if (naked_buffer->size())
prefix_written = true;
/// In case of formats with prefixes if file is not empty we have already written prefix.
bool do_not_write_prefix = naked_buffer->size();
write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3);
writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name,
*write_buf, metadata_snapshot->getSampleBlock(), context,
{}, format_settings);
if (do_not_write_prefix)
writer->doNotWritePrefix();
}
String getName() const override { return "StorageFileSink"; }
void onStart() override
{
if (!prefix_written)
writer->doWritePrefix();
prefix_written = true;
}
void consume(Chunk chunk) override
{
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
@ -648,7 +643,7 @@ public:
void onFinish() override
{
writer->doWriteSuffix();
writer->finish();
}
// void flush() override
@ -662,7 +657,6 @@ private:
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool prefix_written{false};
int table_fd;
bool use_table_fd;

View File

@ -321,11 +321,6 @@ public:
void consume(Chunk chunk) override
{
if (is_first_chunk)
{
writer->doWritePrefix();
is_first_chunk = false;
}
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
@ -333,7 +328,7 @@ public:
{
try
{
writer->doWriteSuffix();
writer->finish();
writer->flush();
write_buf->finalize();
}
@ -350,7 +345,6 @@ private:
std::optional<FormatSettings> format_settings;
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool is_first_chunk = true;
};

View File

@ -226,18 +226,12 @@ StorageURLSink::StorageURLSink(
void StorageURLSink::consume(Chunk chunk)
{
if (is_first_chunk)
{
writer->doWritePrefix();
is_first_chunk = false;
}
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void StorageURLSink::onFinish()
{
writer->doWriteSuffix();
writer->finish();
writer->flush();
write_buf->finalize();
}

View File

@ -110,8 +110,6 @@ public:
private:
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool is_first_chunk = true;
};
class StorageURL : public shared_ptr_helper<StorageURL>, public IStorageURLBase

View File

@ -1,5 +1,9 @@
select * from numbers(100) settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT JSON settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT TSVWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT CSVWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT JSONCompactEachRowWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 }
select * from numbers(100) FORMAT XML settings max_result_rows = 1; -- { serverError 396 }
SET max_result_rows = 1;
select * from numbers(10); -- { serverError 396 }

View File

@ -0,0 +1,20 @@
0\t1970-01-01\n
1\t1970-01-02\n
2\t1970-01-03\n
3\t1970-01-04\n
4\t1970-01-05\n
0,"1970-01-01"\n
1,"1970-01-02"\n
2,"1970-01-03"\n
3,"1970-01-04"\n
4,"1970-01-05"\n
["0", "1970-01-01"]\n
["1", "1970-01-02"]\n
["2", "1970-01-03"]\n
["3", "1970-01-04"]\n
["4", "1970-01-05"]\n
\t\t<row>\n\t\t\t<number>0</number>\n\t\t\t<field>1970-01-01</field>\n\t\t</row>\n
\t\t<row>\n\t\t\t<number>1</number>\n\t\t\t<field>1970-01-02</field>\n\t\t</row>\n
\t\t<row>\n\t\t\t<number>2</number>\n\t\t\t<field>1970-01-03</field>\n\t\t</row>\n
\t\t<row>\n\t\t\t<number>3</number>\n\t\t\t<field>1970-01-04</field>\n\t\t</row>\n
\t\t<row>\n\t\t\t<number>4</number>\n\t\t\t<field>1970-01-05</field>\n\t\t</row>\n

View File

@ -0,0 +1,5 @@
select formatRow('TSVWithNamesAndTypes', number, toDate(number)) from numbers(5);
select formatRow('CSVWithNamesAndTypes', number, toDate(number)) from numbers(5);
select formatRow('JSONCompactEachRowWithNamesAndTypes', number, toDate(number)) from numbers(5);
select formatRow('XML', number, toDate(number)) from numbers(5);