diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index ebc6c7d3107..880c867b053 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -391,7 +391,7 @@ void ClientBase::initBlockOutputStream(const Block & block, ASTPtr parsed_query) output_format = global_context->getOutputFormat( current_format, out_file_buf ? *out_file_buf : *out_buf, block); - output_format->doWritePrefix(); + output_format->setAutoFlush(); } } @@ -671,7 +671,7 @@ void ClientBase::onEndOfStream() progress_indication.clearProgressOutput(); if (output_format) - output_format->doWriteSuffix(); + output_format->finish(); resetOutput(); diff --git a/src/Functions/formatRow.cpp b/src/Functions/formatRow.cpp index 3f9d3e782d7..ccd013123d7 100644 --- a/src/Functions/formatRow.cpp +++ b/src/Functions/formatRow.cpp @@ -77,6 +77,8 @@ public: if (!dynamic_cast(out.get())) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot turn rows into a {} format strings. {} function supports only row output formats", format_name, getName()); + /// Don't write prefix if any. + out->doNotWritePrefix(); out->write(arg_columns); return col_str; } diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 0f94622b7c6..a0b39f20b96 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -65,11 +65,7 @@ static Chunk prepareTotals(Chunk chunk) void IOutputFormat::work() { - if (!prefix_written) - { - doWritePrefix(); - prefix_written = true; - } + writePrefixIfNot(); if (finished && !finalized) { @@ -110,10 +106,17 @@ void IOutputFormat::flush() void IOutputFormat::write(const Block & block) { + writePrefixIfNot(); consume(Chunk(block.getColumns(), block.rows())); if (auto_flush) flush(); } +void IOutputFormat::finish() +{ + writePrefixIfNot(); + finalize(); +} + } diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index ba4dcee6f70..225971a8fe6 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -25,28 +25,6 @@ class IOutputFormat : public IProcessor public: enum PortKind { Main = 0, Totals = 1, Extremes = 2 }; -protected: - WriteBuffer & out; - - Chunk current_chunk; - PortKind current_block_kind = PortKind::Main; - bool has_input = false; - bool finished = false; - bool finalized = false; - - /// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready. - bool auto_flush = false; - - RowsBeforeLimitCounterPtr rows_before_limit_counter; - - friend class ParallelFormattingOutputFormat; - - virtual void consume(Chunk) = 0; - virtual void consumeTotals(Chunk) {} - virtual void consumeExtremes(Chunk) {} - virtual void finalize() {} - -public: IOutputFormat(const Block & header_, WriteBuffer & out_); Status prepare() override; @@ -77,8 +55,7 @@ public: void write(const Block & block); - virtual void doWritePrefix() {} - virtual void doWriteSuffix() { finalize(); } + virtual void finish(); virtual bool expectMaterializedColumns() const { return true; } @@ -88,11 +65,43 @@ public: size_t getResultRows() const { return result_rows; } size_t getResultBytes() const { return result_bytes; } + void doNotWritePrefix() { need_write_prefix = false; } + +protected: + friend class ParallelFormattingOutputFormat; + + virtual void consume(Chunk) = 0; + virtual void consumeTotals(Chunk) {} + virtual void consumeExtremes(Chunk) {} + virtual void finalize() {} + virtual void writePrefix() {} + + void writePrefixIfNot() + { + if (need_write_prefix) + { + writePrefix(); + need_write_prefix = false; + } + } + + WriteBuffer & out; + + Chunk current_chunk; + PortKind current_block_kind = PortKind::Main; + bool has_input = false; + bool finished = false; + bool finalized = false; + + /// Flush data on each consumed chunk. This is intended for interactive applications to output data as soon as it's ready. + bool auto_flush = false; + bool need_write_prefix = true; + + RowsBeforeLimitCounterPtr rows_before_limit_counter; + private: /// Counters for consumed chunks. Are used for QueryLog. size_t result_rows = 0; size_t result_bytes = 0; - - bool prefix_written = false; }; } diff --git a/src/Processors/Formats/IRowOutputFormat.cpp b/src/Processors/Formats/IRowOutputFormat.cpp index 6b7a9a46eaa..410d2d07d35 100644 --- a/src/Processors/Formats/IRowOutputFormat.cpp +++ b/src/Processors/Formats/IRowOutputFormat.cpp @@ -22,8 +22,6 @@ IRowOutputFormat::IRowOutputFormat(const Block & header, WriteBuffer & out_, con void IRowOutputFormat::consume(DB::Chunk chunk) { - writePrefixIfNot(); - auto num_rows = chunk.getNumRows(); const auto & columns = chunk.getColumns(); @@ -43,7 +41,6 @@ void IRowOutputFormat::consume(DB::Chunk chunk) void IRowOutputFormat::consumeTotals(DB::Chunk chunk) { - writePrefixIfNot(); writeSuffixIfNot(); auto num_rows = chunk.getNumRows(); @@ -59,7 +56,6 @@ void IRowOutputFormat::consumeTotals(DB::Chunk chunk) void IRowOutputFormat::consumeExtremes(DB::Chunk chunk) { - writePrefixIfNot(); writeSuffixIfNot(); auto num_rows = chunk.getNumRows(); @@ -76,7 +72,6 @@ void IRowOutputFormat::consumeExtremes(DB::Chunk chunk) void IRowOutputFormat::finalize() { - writePrefixIfNot(); writeSuffixIfNot(); writeLastSuffix(); } diff --git a/src/Processors/Formats/IRowOutputFormat.h b/src/Processors/Formats/IRowOutputFormat.h index 18575419cd0..2e658c452f6 100644 --- a/src/Processors/Formats/IRowOutputFormat.h +++ b/src/Processors/Formats/IRowOutputFormat.h @@ -26,41 +26,13 @@ class IRowOutputFormat : public IOutputFormat public: using Params = RowOutputFormatParams; -private: - bool prefix_written = false; - bool suffix_written = false; - protected: - DataTypes types; - Serializations serializations; - Params params; - - bool first_row = true; - + IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_); void consume(Chunk chunk) override; void consumeTotals(Chunk chunk) override; void consumeExtremes(Chunk chunk) override; void finalize() override; - void writePrefixIfNot() - { - if (!prefix_written) - writePrefix(); - - prefix_written = true; - } - - void writeSuffixIfNot() - { - if (!suffix_written) - writeSuffix(); - - suffix_written = true; - } - -public: - IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_); - /** Write a row. * Default implementation calls methods to write single values and delimiters * (except delimiter between rows (writeRowBetweenDelimiter())). @@ -78,7 +50,7 @@ public: virtual void writeRowStartDelimiter() {} /// delimiter before each row virtual void writeRowEndDelimiter() {} /// delimiter after each row virtual void writeRowBetweenDelimiter() {} /// delimiter between rows - virtual void writePrefix() {} /// delimiter before resultset + virtual void writePrefix() override {} /// delimiter before resultset virtual void writeSuffix() {} /// delimiter after resultset virtual void writeBeforeTotals() {} virtual void writeAfterTotals() {} @@ -86,6 +58,22 @@ public: virtual void writeAfterExtremes() {} virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes. + DataTypes types; + Serializations serializations; + Params params; + + bool first_row = true; + +private: + void writeSuffixIfNot() + { + if (!suffix_written) + writeSuffix(); + + suffix_written = true; + } + + bool suffix_written = false; }; } diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h index 1136937e65b..44e18e3f852 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.h @@ -24,10 +24,9 @@ public: String getName() const override { return "ArrowBlockInputFormat"; } -protected: +private: Chunk generate() override; -private: // Whether to use ArrowStream format bool stream; // This field is only used for ArrowStream format diff --git a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.h b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.h index 154292d838f..4534575e735 100644 --- a/src/Processors/Formats/Impl/ArrowBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/ArrowBlockOutputFormat.h @@ -21,19 +21,20 @@ public: ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_); String getName() const override { return "ArrowBlockOutputFormat"; } - void consume(Chunk) override; - void finalize() override; String getContentType() const override { return "application/octet-stream"; } private: + void consume(Chunk) override; + void finalize() override; + + void prepareWriter(const std::shared_ptr & schema); + bool stream; const FormatSettings format_settings; std::shared_ptr arrow_ostream; std::shared_ptr writer; std::unique_ptr ch_column_to_arrow_column; - - void prepareWriter(const std::shared_ptr & schema); }; } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 5617b4a7661..2de11178e96 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -107,12 +107,13 @@ class AvroRowInputFormat : public IRowInputFormat { public: AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_); - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; - void readPrefix() override; String getName() const override { return "AvroRowInputFormat"; } private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + void readPrefix() override; + std::unique_ptr file_reader_ptr; std::unique_ptr deserializer_ptr; bool allow_missing_fields; @@ -128,14 +129,16 @@ class AvroConfluentRowInputFormat : public IRowInputFormat { public: AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_); - virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "AvroConfluentRowInputFormat"; } class SchemaRegistry; -protected: + +private: + virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool allowSyncAfterError() const override { return true; } void syncAfterError() override; -private: + std::shared_ptr schema_registry; using SchemaId = uint32_t; std::unordered_map deserializer_cache; diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp index 467738f49e8..fd7b2404c77 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -428,7 +428,6 @@ void AvroRowOutputFormat::consumeImpl(DB::Chunk chunk) auto num_rows = chunk.getNumRows(); const auto & columns = chunk.getColumns(); - writePrefixIfNot(); for (size_t row = 0; row < num_rows; ++row) { write(columns, row); @@ -447,7 +446,7 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) /// used by WriteBufferToKafkaProducer to obtain auxiliary data /// from the starting row of a file - writePrefix(); + writePrefixIfNot(); for (size_t row_in_file = 0; row_in_file < settings.avro.output_rows_in_file && row < num_rows; ++row, ++row_in_file) @@ -457,6 +456,7 @@ void AvroRowOutputFormat::consumeImplWithCallback(DB::Chunk chunk) file_writer_ptr->flush(); writeSuffix(); + need_write_prefix = true; params.callback(columns, current_row); } diff --git a/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/src/Processors/Formats/Impl/AvroRowOutputFormat.h index a3e8493f757..b5583406cb8 100644 --- a/src/Processors/Formats/Impl/AvroRowOutputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -51,12 +51,13 @@ public: void consume(Chunk) override; String getName() const override { return "AvroRowOutputFormat"; } + +private: void write(const Columns & columns, size_t row_num) override; void writeField(const IColumn &, const ISerialization &, size_t) override {} virtual void writePrefix() override; virtual void writeSuffix() override; -private: FormatSettings settings; AvroSerializer serializer; std::unique_ptr file_writer_ptr; diff --git a/src/Processors/Formats/Impl/BinaryRowOutputFormat.h b/src/Processors/Formats/Impl/BinaryRowOutputFormat.h index 36a62098b75..0edfd4bfcf8 100644 --- a/src/Processors/Formats/Impl/BinaryRowOutputFormat.h +++ b/src/Processors/Formats/Impl/BinaryRowOutputFormat.h @@ -21,12 +21,12 @@ public: String getName() const override { return "BinaryRowOutputFormat"; } + String getContentType() const override { return "application/octet-stream"; } + +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writePrefix() override; - String getContentType() const override { return "application/octet-stream"; } - -protected: bool with_names; bool with_types; }; diff --git a/src/Processors/Formats/Impl/CSVRowInputFormat.h b/src/Processors/Formats/Impl/CSVRowInputFormat.h index f239464485a..d7c557b58d8 100644 --- a/src/Processors/Formats/Impl/CSVRowInputFormat.h +++ b/src/Processors/Formats/Impl/CSVRowInputFormat.h @@ -25,12 +25,13 @@ public: String getName() const override { return "CSVRowInputFormat"; } +private: bool allowSyncAfterError() const override { return true; } void syncAfterError() override; -private: bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override; bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override; + bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override { return *pos != '\n' && *pos != '\r' && *pos != format_settings.csv.delimiter && *pos != ' ' && *pos != '\t'; diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp index b300928e569..790994cb240 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.cpp @@ -31,7 +31,7 @@ void CSVRowOutputFormat::writeLine(const std::vector & values) } } -void CSVRowOutputFormat::doWritePrefix() +void CSVRowOutputFormat::writePrefix() { const auto & sample = getPort(PortKind::Main).getHeader(); diff --git a/src/Processors/Formats/Impl/CSVRowOutputFormat.h b/src/Processors/Formats/Impl/CSVRowOutputFormat.h index 7f5d90203ea..dd9c2179f19 100644 --- a/src/Processors/Formats/Impl/CSVRowOutputFormat.h +++ b/src/Processors/Formats/Impl/CSVRowOutputFormat.h @@ -24,14 +24,6 @@ public: String getName() const override { return "CSVRowOutputFormat"; } - void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; - void writeFieldDelimiter() override; - void writeRowEndDelimiter() override; - void writeBeforeTotals() override; - void writeBeforeExtremes() override; - - void doWritePrefix() override; - /// https://www.iana.org/assignments/media-types/text/csv String getContentType() const override { @@ -39,6 +31,13 @@ public: } private: + void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; + void writeFieldDelimiter() override; + void writeRowEndDelimiter() override; + void writeBeforeTotals() override; + void writeBeforeExtremes() override; + + void writePrefix() override; void writeLine(const std::vector & values); bool with_names; diff --git a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h index fc30cf11237..4c0f34d70a3 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h +++ b/src/Processors/Formats/Impl/CapnProtoRowInputFormat.h @@ -26,9 +26,9 @@ public: String getName() const override { return "CapnProtoRowInputFormat"; } +private: bool readRow(MutableColumns & columns, RowReadExtension &) override; -private: kj::Array readMessage(); std::shared_ptr parser; diff --git a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h index 0f321071d62..6e27426f2cc 100644 --- a/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h +++ b/src/Processors/Formats/Impl/CapnProtoRowOutputFormat.h @@ -35,11 +35,11 @@ public: String getName() const override { return "CapnProtoRowOutputFormat"; } +private: void write(const Columns & columns, size_t row_num) override; void writeField(const IColumn &, const ISerialization &, size_t) override { } -private: Names column_names; DataTypes column_types; capnp::StructSchema schema; diff --git a/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h b/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h index c70d9efb178..96ad60b3fab 100644 --- a/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONAsStringRowInputFormat.h @@ -18,14 +18,15 @@ class JSONAsStringRowInputFormat : public IRowInputFormat public: JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "JSONAsStringRowInputFormat"; } void resetParser() override; +private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + void readPrefix() override; void readSuffix() override; -private: void readJSONObject(IColumn & column); PeekableReadBuffer buf; diff --git a/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h b/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h index 373eb04f06c..e01a4f49b30 100644 --- a/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.h @@ -31,10 +31,10 @@ public: String getName() const override { return "JSONCompactEachRowRowInputFormat"; } +private: bool allowSyncAfterError() const override { return true; } void syncAfterError() override; -private: bool parseRowStartWithDiagnosticInfo(WriteBuffer & out) override; bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override; bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override; diff --git a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp index cdff7ff2070..c4645e0d63d 100644 --- a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.cpp @@ -81,7 +81,7 @@ void JSONCompactEachRowRowOutputFormat::writeLine(const std::vector & va writeRowEndDelimiter(); } -void JSONCompactEachRowRowOutputFormat::doWritePrefix() +void JSONCompactEachRowRowOutputFormat::writePrefix() { const auto & header = getPort(PortKind::Main).getHeader(); diff --git a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h index aa12ba7e809..6cb78bab49d 100644 --- a/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONCompactEachRowRowOutputFormat.h @@ -26,7 +26,8 @@ public: String getName() const override { return "JSONCompactEachRowRowOutputFormat"; } - void doWritePrefix() override; +private: + void writePrefix() override; void writeTotals(const Columns & columns, size_t row_num) override; @@ -35,12 +36,10 @@ public: void writeRowStartDelimiter() override; void writeRowEndDelimiter() override; -protected: void consumeTotals(Chunk) override; /// No extremes. void consumeExtremes(Chunk) override {} -private: void writeLine(const std::vector & values); FormatSettings settings; diff --git a/src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h b/src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h index 9bb433c50b1..961bd569d39 100644 --- a/src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONCompactRowOutputFormat.h @@ -25,6 +25,7 @@ public: String getName() const override { return "JSONCompactRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeFieldDelimiter() override; void writeRowStartDelimiter() override; @@ -33,7 +34,6 @@ public: void writeBeforeTotals() override; void writeAfterTotals() override; -protected: void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override; void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num) override diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h index 29a6ce6ecb8..9810f2dc765 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h @@ -28,16 +28,16 @@ public: bool yield_strings_); String getName() const override { return "JSONEachRowRowInputFormat"; } + void resetParser() override; +private: void readPrefix() override; void readSuffix() override; bool readRow(MutableColumns & columns, RowReadExtension & ext) override; bool allowSyncAfterError() const override { return true; } void syncAfterError() override; - void resetParser() override; -private: const String & columnName(size_t i) const; size_t columnIndex(const StringRef & name, size_t key_index); bool advanceToNextKey(size_t key_index); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h index 10b15f3e7b2..23fb506c220 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h @@ -23,6 +23,7 @@ public: String getName() const override { return "JSONEachRowRowOutputFormat"; } +protected: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeFieldDelimiter() override; void writeRowStartDelimiter() override; @@ -31,7 +32,6 @@ public: void writePrefix() override; void writeSuffix() override; -protected: /// No totals and extremes. void consumeTotals(Chunk) override {} void consumeExtremes(Chunk) override {} diff --git a/src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h b/src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h index 3062d664199..3eac61c4109 100644 --- a/src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h @@ -9,11 +9,12 @@ class JSONEachRowWithProgressRowOutputFormat : public JSONEachRowRowOutputFormat public: using JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat; - void writeRowStartDelimiter() override; - void writeRowEndDelimiter() override; void onProgress(const Progress & value) override; private: + void writeRowStartDelimiter() override; + void writeRowEndDelimiter() override; + Progress progress; }; diff --git a/src/Processors/Formats/Impl/JSONRowOutputFormat.h b/src/Processors/Formats/Impl/JSONRowOutputFormat.h index 75d4aa5d201..757cdf01a35 100644 --- a/src/Processors/Formats/Impl/JSONRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONRowOutputFormat.h @@ -25,6 +25,25 @@ public: String getName() const override { return "JSONRowOutputFormat"; } + void onProgress(const Progress & value) override; + + String getContentType() const override { return "application/json; charset=UTF-8"; } + + void flush() override + { + ostr->next(); + + if (validating_ostr) + out.next(); + } + + void setRowsBeforeLimit(size_t rows_before_limit_) override + { + applied_limit = true; + rows_before_limit = rows_before_limit_; + } + +protected: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeFieldDelimiter() override; void writeRowStartDelimiter() override; @@ -44,25 +63,6 @@ public: void writeLastSuffix() override; - void flush() override - { - ostr->next(); - - if (validating_ostr) - out.next(); - } - - void setRowsBeforeLimit(size_t rows_before_limit_) override - { - applied_limit = true; - rows_before_limit = rows_before_limit_; - } - - void onProgress(const Progress & value) override; - - String getContentType() const override { return "application/json; charset=UTF-8"; } - -protected: virtual void writeTotalsField(const IColumn & column, const ISerialization & serialization, size_t row_num); virtual void writeExtremesElement(const char * title, const Columns & columns, size_t row_num); virtual void writeTotalsFieldDelimiter() { writeFieldDelimiter(); } @@ -70,7 +70,6 @@ protected: void writeRowsBeforeLimitAtLeast(); void writeStatistics(); - std::unique_ptr validating_ostr; /// Validates UTF-8 sequences, replaces bad sequences with replacement character. WriteBuffer * ostr; diff --git a/src/Processors/Formats/Impl/LineAsStringRowInputFormat.h b/src/Processors/Formats/Impl/LineAsStringRowInputFormat.h index 7c0187bc3ff..1a6c6247558 100644 --- a/src/Processors/Formats/Impl/LineAsStringRowInputFormat.h +++ b/src/Processors/Formats/Impl/LineAsStringRowInputFormat.h @@ -17,11 +17,12 @@ class LineAsStringRowInputFormat : public IRowInputFormat public: LineAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "LineAsStringRowInputFormat"; } void resetParser() override; private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + void readLineObject(IColumn & column); }; diff --git a/src/Processors/Formats/Impl/MarkdownRowOutputFormat.h b/src/Processors/Formats/Impl/MarkdownRowOutputFormat.h index 0b2a4dd0b23..7a2aaf86f7d 100644 --- a/src/Processors/Formats/Impl/MarkdownRowOutputFormat.h +++ b/src/Processors/Formats/Impl/MarkdownRowOutputFormat.h @@ -14,6 +14,9 @@ class MarkdownRowOutputFormat : public IRowOutputFormat public: MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_); + String getName() const override { return "MarkdownRowOutputFormat"; } + +private: /// Write higher part of markdown table like this: /// |columnName1|columnName2|...|columnNameN| /// |:-:|:-:|...|:-:| @@ -29,9 +32,7 @@ public: void writeRowEndDelimiter() override ; void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; - String getName() const override { return "MarkdownRowOutputFormat"; } -protected: const FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h index fa5c2e74584..d2d500a4480 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h @@ -59,11 +59,12 @@ class MsgPackRowInputFormat : public IRowInputFormat public: MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; String getName() const override { return "MagPackRowInputFormat"; } void resetParser() override; private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool readObject(); PeekableReadBuffer buf; diff --git a/src/Processors/Formats/Impl/MsgPackRowOutputFormat.h b/src/Processors/Formats/Impl/MsgPackRowOutputFormat.h index 2766eb144e4..17d055818e9 100644 --- a/src/Processors/Formats/Impl/MsgPackRowOutputFormat.h +++ b/src/Processors/Formats/Impl/MsgPackRowOutputFormat.h @@ -22,11 +22,11 @@ public: String getName() const override { return "MsgPackRowOutputFormat"; } +private: void write(const Columns & columns, size_t row_num) override; void writeField(const IColumn &, const ISerialization &, size_t) override {} void serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num); -private: msgpack::packer packer; }; diff --git a/src/Processors/Formats/Impl/MySQLOutputFormat.cpp b/src/Processors/Formats/Impl/MySQLOutputFormat.cpp index 0b366244611..cdbac9f06fb 100644 --- a/src/Processors/Formats/Impl/MySQLOutputFormat.cpp +++ b/src/Processors/Formats/Impl/MySQLOutputFormat.cpp @@ -31,13 +31,8 @@ void MySQLOutputFormat::setContext(ContextPtr context_) context = context_; } -void MySQLOutputFormat::initialize() +void MySQLOutputFormat::writePrefix() { - if (initialized) - return; - - initialized = true; - const auto & header = getPort(PortKind::Main).getHeader(); data_types = header.getDataTypes(); @@ -66,8 +61,6 @@ void MySQLOutputFormat::initialize() void MySQLOutputFormat::consume(Chunk chunk) { - initialize(); - for (size_t i = 0; i < chunk.getNumRows(); i++) { ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), i); diff --git a/src/Processors/Formats/Impl/MySQLOutputFormat.h b/src/Processors/Formats/Impl/MySQLOutputFormat.h index a8e1ada3d6a..6f4337486e2 100644 --- a/src/Processors/Formats/Impl/MySQLOutputFormat.h +++ b/src/Processors/Formats/Impl/MySQLOutputFormat.h @@ -26,15 +26,13 @@ public: void setContext(ContextPtr context_); - void consume(Chunk) override; - void finalize() override; void flush() override; - void doWritePrefix() override { initialize(); } private: - void initialize(); + void consume(Chunk) override; + void finalize() override; + void writePrefix() override; - bool initialized = false; uint32_t client_capabilities = 0; uint8_t * sequence_id = nullptr; uint8_t dummy_sequence_id = 0; diff --git a/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.cpp b/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.cpp index 0e486715c98..a82285c1c19 100644 --- a/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.cpp @@ -64,21 +64,14 @@ void ODBCDriver2BlockOutputFormat::write(Chunk chunk, PortKind port_kind) void ODBCDriver2BlockOutputFormat::consume(Chunk chunk) { - writePrefixIfNot(); write(std::move(chunk), PortKind::Main); } void ODBCDriver2BlockOutputFormat::consumeTotals(Chunk chunk) { - writePrefixIfNot(); write(std::move(chunk), PortKind::Totals); } -void ODBCDriver2BlockOutputFormat::finalize() -{ - writePrefixIfNot(); -} - void ODBCDriver2BlockOutputFormat::writePrefix() { const auto & header = getPort(PortKind::Main).getHeader(); diff --git a/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h b/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h index 4545e429cc2..de6ea22dfd7 100644 --- a/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h +++ b/src/Processors/Formats/Impl/ODBCDriver2BlockOutputFormat.h @@ -24,30 +24,20 @@ public: String getName() const override { return "ODBCDriver2BlockOutputFormat"; } - void consume(Chunk) override; - void consumeTotals(Chunk) override; - void finalize() override; - std::string getContentType() const override { return "application/octet-stream"; } private: + void consume(Chunk) override; + void consumeTotals(Chunk) override; + void writePrefix() override; + const FormatSettings format_settings; - bool prefix_written = false; - - void writePrefixIfNot() - { - if (!prefix_written) - writePrefix(); - - prefix_written = true; - } void writeRow(const Serializations & serializations, const Columns & columns, size_t row_idx, std::string & buffer); void write(Chunk chunk, PortKind port_kind); - void writePrefix(); }; diff --git a/src/Processors/Formats/Impl/ORCBlockOutputFormat.h b/src/Processors/Formats/Impl/ORCBlockOutputFormat.h index c131d724450..fcdc0e9c6d6 100644 --- a/src/Processors/Formats/Impl/ORCBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/ORCBlockOutputFormat.h @@ -37,10 +37,11 @@ public: ORCBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); String getName() const override { return "ORCBlockOutputFormat"; } + +private: void consume(Chunk chunk) override; void finalize() override; -private: ORC_UNIQUE_PTR getORCType(const DataTypePtr & type, const std::string & column_name); /// ConvertFunc is needed for type UInt8, because firstly UInt8 (char8_t) must be diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp index f7723e3f1d2..c7b7a4d2814 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp @@ -171,7 +171,7 @@ namespace DB { case ProcessingUnitType::START : { - formatter->doWritePrefix(); + formatter->writePrefix(); break; } case ProcessingUnitType::PLAIN : @@ -191,7 +191,7 @@ namespace DB } case ProcessingUnitType::FINALIZE : { - formatter->doWriteSuffix(); + formatter->finalize(); break; } } diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 65f4a329505..0f8bff9b7d9 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -95,7 +95,7 @@ public: need_flush = true; } - void doWritePrefix() override + void writePrefix() override { addChunk(Chunk{}, ProcessingUnitType::START, /*can_throw_exception*/ true); } @@ -114,7 +114,7 @@ public: return internal_formatter_creator(buffer)->getContentType(); } -protected: +private: void consume(Chunk chunk) override final { addChunk(std::move(chunk), ProcessingUnitType::PLAIN, /*can_throw_exception*/ true); @@ -132,7 +132,6 @@ protected: void finalize() override; -private: InternalFormatterCreator internal_formatter_creator; /// Status to synchronize multiple threads. diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index 3355b7334dc..264beba8589 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -117,7 +117,7 @@ public: String getName() const override final { return "ParallelParsingBlockInputFormat"; } -protected: +private: Chunk generate() override final; @@ -137,8 +137,6 @@ protected: finishAndWait(); } -private: - class InternalParser { public: diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index bae380f2c80..472aec66da3 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -23,13 +23,11 @@ public: String getName() const override { return "ParquetBlockInputFormat"; } -protected: +private: Chunk generate() override; -private: void prepareReader(); -private: std::unique_ptr file_reader; int row_group_total = 0; // indices of columns to read from Parquet file diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h index 80d11b540b1..f58db3bd202 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.h @@ -30,12 +30,13 @@ public: ParquetBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_); String getName() const override { return "ParquetBlockOutputFormat"; } - void consume(Chunk) override; - void finalize() override; String getContentType() const override { return "application/octet-stream"; } private: + void consume(Chunk) override; + void finalize() override; + const FormatSettings format_settings; std::unique_ptr file_writer; diff --git a/src/Processors/Formats/Impl/PostgreSQLOutputFormat.cpp b/src/Processors/Formats/Impl/PostgreSQLOutputFormat.cpp index a5f92e41da4..f46488fd0a8 100644 --- a/src/Processors/Formats/Impl/PostgreSQLOutputFormat.cpp +++ b/src/Processors/Formats/Impl/PostgreSQLOutputFormat.cpp @@ -11,12 +11,8 @@ PostgreSQLOutputFormat::PostgreSQLOutputFormat(WriteBuffer & out_, const Block & { } -void PostgreSQLOutputFormat::doWritePrefix() +void PostgreSQLOutputFormat::writePrefix() { - if (initialized) - return; - - initialized = true; const auto & header = getPort(PortKind::Main).getHeader(); auto data_types = header.getDataTypes(); @@ -37,8 +33,6 @@ void PostgreSQLOutputFormat::doWritePrefix() void PostgreSQLOutputFormat::consume(Chunk chunk) { - doWritePrefix(); - for (size_t i = 0; i != chunk.getNumRows(); ++i) { const Columns & columns = chunk.getColumns(); @@ -61,8 +55,6 @@ void PostgreSQLOutputFormat::consume(Chunk chunk) } } -void PostgreSQLOutputFormat::finalize() {} - void PostgreSQLOutputFormat::flush() { message_transport.flush(); diff --git a/src/Processors/Formats/Impl/PostgreSQLOutputFormat.h b/src/Processors/Formats/Impl/PostgreSQLOutputFormat.h index 257fbdff341..f5fd55530b9 100644 --- a/src/Processors/Formats/Impl/PostgreSQLOutputFormat.h +++ b/src/Processors/Formats/Impl/PostgreSQLOutputFormat.h @@ -17,13 +17,11 @@ public: String getName() const override {return "PostgreSQLOutputFormat";} - void doWritePrefix() override; - void consume(Chunk) override; - void finalize() override; void flush() override; private: - bool initialized = false; + void writePrefix() override; + void consume(Chunk) override; FormatSettings format_settings; PostgreSQLProtocol::Messaging::MessageTransport message_transport; diff --git a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h index 02b438d2571..8d317277251 100644 --- a/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/PrettyBlockOutputFormat.h @@ -22,13 +22,13 @@ public: String getName() const override { return "PrettyBlockOutputFormat"; } +protected: void consume(Chunk) override; void consumeTotals(Chunk) override; void consumeExtremes(Chunk) override; void finalize() override; -protected: size_t total_rows = 0; size_t terminal_width = 0; bool suffix_written = false; diff --git a/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h b/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h index 96344397a0c..1779a20e122 100644 --- a/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/PrettyCompactBlockOutputFormat.h @@ -16,7 +16,7 @@ public: PrettyCompactBlockOutputFormat(WriteBuffer & out_, const Block & header, const FormatSettings & format_settings_, bool mono_block_); String getName() const override { return "PrettyCompactBlockOutputFormat"; } -protected: +private: void write(const Chunk & chunk, PortKind port_kind) override; void writeHeader(const Block & block, const Widths & max_widths, const Widths & name_widths); void writeBottom(const Widths & max_widths); @@ -28,7 +28,6 @@ protected: const WidthsPerColumn & widths, const Widths & max_widths); -private: bool mono_block; /// For mono_block == true only Chunk mono_chunk; diff --git a/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h b/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h index a041d324fd3..b3090497783 100644 --- a/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/PrettySpaceBlockOutputFormat.h @@ -16,7 +16,7 @@ public: String getName() const override { return "PrettySpaceBlockOutputFormat"; } -protected: +private: void write(const Chunk & chunk, PortKind port_kind) override; void writeSuffix() override; }; diff --git a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h index 553a2dfd7f0..6f465e3f0b8 100644 --- a/src/Processors/Formats/Impl/ProtobufRowInputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowInputFormat.h @@ -32,11 +32,11 @@ public: String getName() const override { return "ProtobufRowInputFormat"; } +private: bool readRow(MutableColumns & columns, RowReadExtension &) override; bool allowSyncAfterError() const override; void syncAfterError() override; -private: std::unique_ptr reader; std::vector missing_column_indices; std::unique_ptr serializer; diff --git a/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h b/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h index 5323aa56323..97b727842a7 100644 --- a/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h +++ b/src/Processors/Formats/Impl/ProtobufRowOutputFormat.h @@ -39,11 +39,12 @@ public: String getName() const override { return "ProtobufRowOutputFormat"; } - void write(const Columns & columns, size_t row_num) override; - void writeField(const IColumn &, const ISerialization &, size_t) override {} std::string getContentType() const override { return "application/octet-stream"; } private: + void write(const Columns & columns, size_t row_num) override; + void writeField(const IColumn &, const ISerialization &, size_t) override {} + std::unique_ptr writer; std::unique_ptr serializer; const bool allow_multiple_rows; diff --git a/src/Processors/Formats/Impl/RawBLOBRowInputFormat.h b/src/Processors/Formats/Impl/RawBLOBRowInputFormat.h index fd2c849687a..343af9f4068 100644 --- a/src/Processors/Formats/Impl/RawBLOBRowInputFormat.h +++ b/src/Processors/Formats/Impl/RawBLOBRowInputFormat.h @@ -16,8 +16,10 @@ class RawBLOBRowInputFormat : public IRowInputFormat public: RawBLOBRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); - bool readRow(MutableColumns & columns, RowReadExtension &) override; String getName() const override { return "RawBLOBRowInputFormat"; } + +private: + bool readRow(MutableColumns & columns, RowReadExtension &) override; }; } diff --git a/src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h b/src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h index 7a29c62e4d8..2c34595c1a4 100644 --- a/src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h +++ b/src/Processors/Formats/Impl/RawBLOBRowOutputFormat.h @@ -34,6 +34,7 @@ public: String getName() const override { return "RawBLOBRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization &, size_t row_num) override; }; diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.h b/src/Processors/Formats/Impl/RegexpRowInputFormat.h index 0cd8778e499..f8923d9c217 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.h +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.h @@ -30,11 +30,11 @@ public: RegexpRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_); String getName() const override { return "RegexpRowInputFormat"; } - - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; void resetParser() override; private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + bool readField(size_t index, MutableColumns & columns); void readFieldsFromMatch(MutableColumns & columns, RowReadExtension & ext); static ColumnFormat stringToFormat(const String & format); diff --git a/src/Processors/Formats/Impl/TSKVRowInputFormat.h b/src/Processors/Formats/Impl/TSKVRowInputFormat.h index bc537158d9b..7d732bae691 100644 --- a/src/Processors/Formats/Impl/TSKVRowInputFormat.h +++ b/src/Processors/Formats/Impl/TSKVRowInputFormat.h @@ -27,14 +27,14 @@ public: String getName() const override { return "TSKVRowInputFormat"; } + void resetParser() override; + +private: void readPrefix() override; bool readRow(MutableColumns & columns, RowReadExtension &) override; bool allowSyncAfterError() const override { return true; } void syncAfterError() override; - void resetParser() override; - -private: const FormatSettings format_settings; /// Buffer for the read from the stream the field name. Used when you have to copy it. diff --git a/src/Processors/Formats/Impl/TSKVRowOutputFormat.h b/src/Processors/Formats/Impl/TSKVRowOutputFormat.h index 24c4e5ca866..980e36c7e25 100644 --- a/src/Processors/Formats/Impl/TSKVRowOutputFormat.h +++ b/src/Processors/Formats/Impl/TSKVRowOutputFormat.h @@ -18,10 +18,10 @@ public: String getName() const override { return "TSKVRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeRowEndDelimiter() override; -protected: NamesAndTypes fields; size_t field_number = 0; }; diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h index 11a788bc900..6e2e283e792 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.h @@ -21,12 +21,10 @@ public: String getName() const override { return "TabSeparatedRowInputFormat"; } +private: bool allowSyncAfterError() const override { return true; } void syncAfterError() override; -private: - bool is_raw; - bool readField(IColumn & column, const DataTypePtr & type, const SerializationPtr & serialization, bool is_last_file_column, const String & column_name) override; @@ -48,6 +46,8 @@ private: bool parseFieldDelimiterWithDiagnosticInfo(WriteBuffer & out) override; bool parseRowEndWithDiagnosticInfo(WriteBuffer & out) override; bool isGarbageAfterField(size_t, ReadBuffer::Position pos) override { return *pos != '\n' && *pos != '\t'; } + + bool is_raw; }; } diff --git a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp index df0c19ad409..5d87f5a0b14 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.cpp @@ -30,7 +30,7 @@ void TabSeparatedRowOutputFormat::writeLine(const std::vector & values) } } -void TabSeparatedRowOutputFormat::doWritePrefix() +void TabSeparatedRowOutputFormat::writePrefix() { const auto & header = getPort(PortKind::Main).getHeader(); diff --git a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h index 7dcc6529f1c..eeada54d74e 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h +++ b/src/Processors/Formats/Impl/TabSeparatedRowOutputFormat.h @@ -29,24 +29,22 @@ public: String getName() const override { return "TabSeparatedRowOutputFormat"; } + /// https://www.iana.org/assignments/media-types/text/tab-separated-values + String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; } + +protected: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeFieldDelimiter() override; void writeRowEndDelimiter() override; void writeBeforeTotals() override; void writeBeforeExtremes() override; - void doWritePrefix() override; - - /// https://www.iana.org/assignments/media-types/text/tab-separated-values - String getContentType() const override { return "text/tab-separated-values; charset=UTF-8"; } - -private: + void writePrefix() override; void writeLine(const std::vector & values); + bool with_names; bool with_types; bool is_raw; - -protected: const FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp index db5db4701a9..dae3398a36c 100644 --- a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.cpp @@ -147,8 +147,6 @@ template void TemplateBlockOutputFormat::writeValue(U v void TemplateBlockOutputFormat::consume(Chunk chunk) { - doWritePrefix(); - size_t rows = chunk.getNumRows(); for (size_t i = 0; i < rows; ++i) @@ -161,13 +159,9 @@ void TemplateBlockOutputFormat::consume(Chunk chunk) } } -void TemplateBlockOutputFormat::doWritePrefix() +void TemplateBlockOutputFormat::writePrefix() { - if (need_write_prefix) - { - writeString(format.delimiters.front(), out); - need_write_prefix = false; - } + writeString(format.delimiters.front(), out); } void TemplateBlockOutputFormat::finalize() @@ -175,8 +169,6 @@ void TemplateBlockOutputFormat::finalize() if (finalized) return; - doWritePrefix(); - size_t parts = format.format_idx_to_column_idx.size(); for (size_t i = 0; i < parts; ++i) diff --git a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.h b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.h index 0d41b8888d4..224a8d20176 100644 --- a/src/Processors/Formats/Impl/TemplateBlockOutputFormat.h +++ b/src/Processors/Formats/Impl/TemplateBlockOutputFormat.h @@ -20,8 +20,6 @@ public: String getName() const override { return "TemplateBlockOutputFormat"; } - void doWritePrefix() override; - void setRowsBeforeLimit(size_t rows_before_limit_) override { rows_before_limit = rows_before_limit_; rows_before_limit_set = true; } void onProgress(const Progress & progress_) override { progress.incrementPiecewiseAtomically(progress_); } @@ -40,7 +38,8 @@ public: static ResultsetPart stringToResultsetPart(const String & part); -protected: +private: + void writePrefix() override; void consume(Chunk chunk) override; void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } @@ -50,7 +49,6 @@ protected: void serializeField(const IColumn & column, const ISerialization & serialization, size_t row_num, ColumnFormat format); template void writeValue(U value, ColumnFormat col_format); -protected: const FormatSettings settings; Serializations serializations; @@ -65,7 +63,6 @@ protected: Stopwatch watch; size_t row_count = 0; - bool need_write_prefix = true; std::string row_between_delimiter; }; diff --git a/src/Processors/Formats/Impl/TemplateRowInputFormat.h b/src/Processors/Formats/Impl/TemplateRowInputFormat.h index 322f8570ab7..428bef40fcf 100644 --- a/src/Processors/Formats/Impl/TemplateRowInputFormat.h +++ b/src/Processors/Formats/Impl/TemplateRowInputFormat.h @@ -22,6 +22,9 @@ public: String getName() const override { return "TemplateRowInputFormat"; } + void resetParser() override; + +private: bool readRow(MutableColumns & columns, RowReadExtension & extra) override; void readPrefix() override; @@ -29,9 +32,6 @@ public: bool allowSyncAfterError() const override; void syncAfterError() override; - void resetParser() override; - -private: bool deserializeField(const DataTypePtr & type, const SerializationPtr & serialization, IColumn & column, size_t file_column); @@ -51,7 +51,6 @@ private: void skipToNextDelimiterOrEof(const String & delimiter); -private: PeekableReadBuffer buf; const DataTypes data_types; diff --git a/src/Processors/Formats/Impl/ValuesRowOutputFormat.h b/src/Processors/Formats/Impl/ValuesRowOutputFormat.h index 493ce458b1e..8d89854d43c 100644 --- a/src/Processors/Formats/Impl/ValuesRowOutputFormat.h +++ b/src/Processors/Formats/Impl/ValuesRowOutputFormat.h @@ -19,13 +19,13 @@ public: String getName() const override { return "ValuesRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeFieldDelimiter() override; void writeRowStartDelimiter() override; void writeRowEndDelimiter() override; void writeRowBetweenDelimiter() override; -private: const FormatSettings format_settings; }; diff --git a/src/Processors/Formats/Impl/VerticalRowOutputFormat.h b/src/Processors/Formats/Impl/VerticalRowOutputFormat.h index 9e89f677f87..075c943cd76 100644 --- a/src/Processors/Formats/Impl/VerticalRowOutputFormat.h +++ b/src/Processors/Formats/Impl/VerticalRowOutputFormat.h @@ -22,6 +22,7 @@ public: String getName() const override { return "VerticalRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeRowStartDelimiter() override; void writeRowBetweenDelimiter() override; @@ -34,8 +35,7 @@ public: void writeBeforeTotals() override; void writeBeforeExtremes() override; -protected: - virtual void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const; + void writeValue(const IColumn & column, const ISerialization & serialization, size_t row_num) const; /// For totals and extremes. void writeSpecialRow(const Columns & columns, size_t row_num, const char * title); diff --git a/src/Processors/Formats/Impl/XMLRowOutputFormat.h b/src/Processors/Formats/Impl/XMLRowOutputFormat.h index 8ca4721c459..7f08fc12bd1 100644 --- a/src/Processors/Formats/Impl/XMLRowOutputFormat.h +++ b/src/Processors/Formats/Impl/XMLRowOutputFormat.h @@ -20,6 +20,7 @@ public: String getName() const override { return "XMLRowOutputFormat"; } +private: void writeField(const IColumn & column, const ISerialization & serialization, size_t row_num) override; void writeRowStartDelimiter() override; void writeRowEndDelimiter() override; @@ -54,7 +55,6 @@ public: String getContentType() const override { return "application/xml; charset=UTF-8"; } -protected: void writeExtremesElement(const char * title, const Columns & columns, size_t row_num); void writeRowsBeforeLimitAtLeast(); void writeStatistics(); diff --git a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.h b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.h index 0fd83238f5f..1cd349b8183 100644 --- a/src/Processors/Formats/RowInputFormatWithNamesAndTypes.h +++ b/src/Processors/Formats/RowInputFormatWithNamesAndTypes.h @@ -30,8 +30,6 @@ public: const Params & params_, bool with_names_, bool with_types_, const FormatSettings & format_settings_); - bool readRow(MutableColumns & columns, RowReadExtension & ext) override; - void readPrefix() override; void resetParser() override; protected: @@ -67,6 +65,9 @@ protected: DataTypes data_types; private: + bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + void readPrefix() override; + bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns, WriteBuffer & out) override; void tryDeserializeField(const DataTypePtr & type, IColumn & column, size_t file_column) override; diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index ba2644e0fba..7d68482ed78 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -1090,7 +1090,6 @@ namespace write_buffer.emplace(*result.mutable_output()); output_format_processor = query_context->getOutputFormat(output_format, *write_buffer, header); - output_format_processor->doWritePrefix(); Stopwatch after_send_progress; /// Unless the input() function is used we are not going to receive input data anymore. @@ -1169,7 +1168,7 @@ namespace executor->execute(); } - output_format_processor->doWriteSuffix(); + output_format_processor->finish(); } void Call::finishQuery() @@ -1380,9 +1379,8 @@ namespace WriteBufferFromString buf{*result.mutable_totals()}; auto format = query_context->getOutputFormat(output_format, buf, totals); - format->doWritePrefix(); format->write(materializeBlock(totals)); - format->doWriteSuffix(); + format->finish(); } void Call::addExtremesToResult(const Block & extremes) @@ -1392,9 +1390,8 @@ namespace WriteBufferFromString buf{*result.mutable_extremes()}; auto format = query_context->getOutputFormat(output_format, buf, extremes); - format->doWritePrefix(); format->write(materializeBlock(extremes)); - format->doWriteSuffix(); + format->finish(); } void Call::addProfileInfoToResult(const ProfileInfo & info) diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index b46668a233b..e2db5109585 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -210,11 +210,6 @@ public: void consume(Chunk chunk) override { - if (is_first_chunk) - { - writer->doWritePrefix(); - is_first_chunk = false; - } writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } @@ -222,7 +217,7 @@ public: { try { - writer->doWriteSuffix(); + writer->finish(); writer->flush(); write_buf->sync(); write_buf->finalize(); @@ -237,7 +232,6 @@ public: private: std::unique_ptr write_buf; OutputFormatPtr writer; - bool is_first_chunk = true; }; diff --git a/src/Storages/Kafka/KafkaBlockOutputStream.cpp b/src/Storages/Kafka/KafkaBlockOutputStream.cpp index d3c51fef9b7..79163ff8d17 100644 --- a/src/Storages/Kafka/KafkaBlockOutputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockOutputStream.cpp @@ -42,7 +42,7 @@ void KafkaSink::consume(Chunk chunk) void KafkaSink::onFinish() { if (format) - format->doWriteSuffix(); + format->finish(); //flush(); if (buffer) diff --git a/src/Storages/RabbitMQ/RabbitMQSink.cpp b/src/Storages/RabbitMQ/RabbitMQSink.cpp index 2b8d5ab3810..fc3dccb57fa 100644 --- a/src/Storages/RabbitMQ/RabbitMQSink.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSink.cpp @@ -49,7 +49,7 @@ void RabbitMQSink::consume(Chunk chunk) void RabbitMQSink::onFinish() { - format->doWriteSuffix(); + format->finish(); if (buffer) buffer->updateMaxWait(); diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 4a1eac2a39e..af46e2d3ba1 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -621,26 +621,21 @@ public: naked_buffer = std::make_unique(paths[0], DBMS_DEFAULT_BUFFER_SIZE, flags); } - /// In case of CSVWithNames we have already written prefix. - if (naked_buffer->size()) - prefix_written = true; + /// In case of formats with prefixes if file is not empty we have already written prefix. + bool do_not_write_prefix = naked_buffer->size(); write_buf = wrapWriteBufferWithCompressionMethod(std::move(naked_buffer), compression_method, 3); writer = FormatFactory::instance().getOutputFormatParallelIfPossible(format_name, *write_buf, metadata_snapshot->getSampleBlock(), context, {}, format_settings); + + if (do_not_write_prefix) + writer->doNotWritePrefix(); } String getName() const override { return "StorageFileSink"; } - void onStart() override - { - if (!prefix_written) - writer->doWritePrefix(); - prefix_written = true; - } - void consume(Chunk chunk) override { writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); @@ -648,7 +643,7 @@ public: void onFinish() override { - writer->doWriteSuffix(); + writer->finish(); } // void flush() override @@ -662,7 +657,6 @@ private: std::unique_ptr write_buf; OutputFormatPtr writer; - bool prefix_written{false}; int table_fd; bool use_table_fd; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 80011cde077..99391963fd1 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -321,11 +321,6 @@ public: void consume(Chunk chunk) override { - if (is_first_chunk) - { - writer->doWritePrefix(); - is_first_chunk = false; - } writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } @@ -333,7 +328,7 @@ public: { try { - writer->doWriteSuffix(); + writer->finish(); writer->flush(); write_buf->finalize(); } @@ -350,7 +345,6 @@ private: std::optional format_settings; std::unique_ptr write_buf; OutputFormatPtr writer; - bool is_first_chunk = true; }; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 66033f7a7d6..6882d1acf0e 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -226,18 +226,12 @@ StorageURLSink::StorageURLSink( void StorageURLSink::consume(Chunk chunk) { - if (is_first_chunk) - { - writer->doWritePrefix(); - is_first_chunk = false; - } - writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } void StorageURLSink::onFinish() { - writer->doWriteSuffix(); + writer->finish(); writer->flush(); write_buf->finalize(); } diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index a79100c8d70..cf72352a183 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -110,8 +110,6 @@ public: private: std::unique_ptr write_buf; OutputFormatPtr writer; - - bool is_first_chunk = true; }; class StorageURL : public shared_ptr_helper, public IStorageURLBase diff --git a/tests/queries/0_stateless/01553_settings_early_apply.sql b/tests/queries/0_stateless/01553_settings_early_apply.sql index 48178c38f33..13bb2d30d4d 100644 --- a/tests/queries/0_stateless/01553_settings_early_apply.sql +++ b/tests/queries/0_stateless/01553_settings_early_apply.sql @@ -1,5 +1,9 @@ select * from numbers(100) settings max_result_rows = 1; -- { serverError 396 } select * from numbers(100) FORMAT JSON settings max_result_rows = 1; -- { serverError 396 } +select * from numbers(100) FORMAT TSVWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 } +select * from numbers(100) FORMAT CSVWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 } +select * from numbers(100) FORMAT JSONCompactEachRowWithNamesAndTypes settings max_result_rows = 1; -- { serverError 396 } +select * from numbers(100) FORMAT XML settings max_result_rows = 1; -- { serverError 396 } SET max_result_rows = 1; select * from numbers(10); -- { serverError 396 } diff --git a/tests/queries/0_stateless/02113_format_row.reference b/tests/queries/0_stateless/02113_format_row.reference new file mode 100644 index 00000000000..0ac3a15b115 --- /dev/null +++ b/tests/queries/0_stateless/02113_format_row.reference @@ -0,0 +1,20 @@ +0\t1970-01-01\n +1\t1970-01-02\n +2\t1970-01-03\n +3\t1970-01-04\n +4\t1970-01-05\n +0,"1970-01-01"\n +1,"1970-01-02"\n +2,"1970-01-03"\n +3,"1970-01-04"\n +4,"1970-01-05"\n +["0", "1970-01-01"]\n +["1", "1970-01-02"]\n +["2", "1970-01-03"]\n +["3", "1970-01-04"]\n +["4", "1970-01-05"]\n +\t\t\n\t\t\t0\n\t\t\t1970-01-01\n\t\t\n +\t\t\n\t\t\t1\n\t\t\t1970-01-02\n\t\t\n +\t\t\n\t\t\t2\n\t\t\t1970-01-03\n\t\t\n +\t\t\n\t\t\t3\n\t\t\t1970-01-04\n\t\t\n +\t\t\n\t\t\t4\n\t\t\t1970-01-05\n\t\t\n diff --git a/tests/queries/0_stateless/02113_format_row.sql b/tests/queries/0_stateless/02113_format_row.sql new file mode 100644 index 00000000000..93ee6d0f1dd --- /dev/null +++ b/tests/queries/0_stateless/02113_format_row.sql @@ -0,0 +1,5 @@ +select formatRow('TSVWithNamesAndTypes', number, toDate(number)) from numbers(5); +select formatRow('CSVWithNamesAndTypes', number, toDate(number)) from numbers(5); +select formatRow('JSONCompactEachRowWithNamesAndTypes', number, toDate(number)) from numbers(5); +select formatRow('XML', number, toDate(number)) from numbers(5); +