diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 88649d9ca25..2f0ef603022 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -5,6 +5,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + IOutputFormat::IOutputFormat(const Block & header_, WriteBuffer & out_) : IProcessor({header_, header_, header_}, {}), out(out_) { @@ -30,7 +35,7 @@ IOutputFormat::Status IOutputFormat::prepare() if (!input.hasData()) return Status::NeedData; - current_chunk = input.pull(true); + current_chunk = input.pullData(true); current_block_kind = kind; has_input = true; return Status::Ready; @@ -44,23 +49,31 @@ IOutputFormat::Status IOutputFormat::prepare() return Status::Finished; } -static Chunk prepareTotals(Chunk chunk) +static Port::Data prepareTotals(Port::Data data) { - if (!chunk.hasRows()) + if (data.exception) + return data; + + if (!data.chunk.hasRows()) return {}; - if (chunk.getNumRows() > 1) + if (data.chunk.getNumRows() > 1) { /// This may happen if something like ARRAY JOIN was executed on totals. /// Skip rows except the first one. - auto columns = chunk.detachColumns(); + auto columns = data.chunk.detachColumns(); for (auto & column : columns) column = column->cut(0, 1); - chunk.setColumns(std::move(columns), 1); + data.chunk.setColumns(std::move(columns), 1); } - return chunk; + return data; +} + +void IOutputFormat::consume(Chunk) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method consume s not implemented for {}", getName()); } void IOutputFormat::work() @@ -84,17 +97,24 @@ void IOutputFormat::work() switch (current_block_kind) { case Main: - result_rows += current_chunk.getNumRows(); - result_bytes += current_chunk.allocatedBytes(); + { + result_rows += current_chunk.chunk.getNumRows(); + result_bytes += current_chunk.chunk.allocatedBytes(); consume(std::move(current_chunk)); break; + } case Totals: - if (auto totals = prepareTotals(std::move(current_chunk))) + { + auto totals = prepareTotals(std::move(current_chunk)); + if (totals.exception || totals.chunk) consumeTotals(std::move(totals)); break; + } case Extremes: + { consumeExtremes(std::move(current_chunk)); break; + } } if (auto_flush) diff --git a/src/Processors/Formats/IOutputFormat.h b/src/Processors/Formats/IOutputFormat.h index 67c307df2aa..e3552b734e8 100644 --- a/src/Processors/Formats/IOutputFormat.h +++ b/src/Processors/Formats/IOutputFormat.h @@ -28,7 +28,7 @@ public: protected: WriteBuffer & out; - Chunk current_chunk; + Port::Data current_chunk; PortKind current_block_kind = PortKind::Main; bool has_input = false; bool finished = false; @@ -39,9 +39,14 @@ protected: RowsBeforeLimitCounterPtr rows_before_limit_counter; - virtual void consume(Chunk) = 0; + virtual void consume(Chunk); virtual void consumeTotals(Chunk) {} virtual void consumeExtremes(Chunk) {} + + virtual void consume(Port::Data data) { consume(data.getChunkOrTrow()); } + virtual void consumeTotals(Port::Data data) { consumeTotals(data.getChunkOrTrow()); } + virtual void consumeExtremes(Port::Data data) { consumeExtremes(data.getChunkOrTrow()); } + virtual void finalize() {} public: @@ -77,8 +82,8 @@ public: virtual void doWritePrefix() {} virtual void doWriteSuffix() { finalize(); } - void setTotals(const Block & totals) { consumeTotals(Chunk(totals.getColumns(), totals.rows())); } - void setExtremes(const Block & extremes) { consumeExtremes(Chunk(extremes.getColumns(), extremes.rows())); } + void setTotals(const Block & totals) { consumeTotals(Port::Data{.chunk = Chunk(totals.getColumns(), totals.rows())}); } + void setExtremes(const Block & extremes) { consumeExtremes(Port::Data{.chunk = Chunk(extremes.getColumns(), extremes.rows())}); } size_t getResultRows() const { return result_rows; } size_t getResultBytes() const { return result_bytes; } diff --git a/src/Processors/Formats/LazyOutputFormat.cpp b/src/Processors/Formats/LazyOutputFormat.cpp index 46287d1cce9..72996de9593 100644 --- a/src/Processors/Formats/LazyOutputFormat.cpp +++ b/src/Processors/Formats/LazyOutputFormat.cpp @@ -15,24 +15,24 @@ Chunk LazyOutputFormat::getChunk(UInt64 milliseconds) return {}; } - Chunk chunk; - if (!queue.tryPop(chunk, milliseconds)) + Port::Data data; + if (!queue.tryPop(data, milliseconds)) return {}; - if (chunk) - info.update(chunk.getNumRows(), chunk.allocatedBytes()); + if (!data.exception) + info.update(data.chunk.getNumRows(), data.chunk.allocatedBytes()); - return chunk; + return data.getChunkOrTrow(); } Chunk LazyOutputFormat::getTotals() { - return std::move(totals); + return totals.getChunkOrTrow(); } Chunk LazyOutputFormat::getExtremes() { - return std::move(extremes); + return extremes.getChunkOrTrow(); } void LazyOutputFormat::setRowsBeforeLimit(size_t rows_before_limit) diff --git a/src/Processors/Formats/LazyOutputFormat.h b/src/Processors/Formats/LazyOutputFormat.h index 06ec116f3dd..9f24e54735c 100644 --- a/src/Processors/Formats/LazyOutputFormat.h +++ b/src/Processors/Formats/LazyOutputFormat.h @@ -37,28 +37,28 @@ public: } protected: - void consume(Chunk chunk) override + void consume(Port::Data data) override { if (!finished_processing) - queue.emplace(std::move(chunk)); + queue.emplace(std::move(data)); } - void consumeTotals(Chunk chunk) override { totals = std::move(chunk); } - void consumeExtremes(Chunk chunk) override { extremes = std::move(chunk); } + void consumeTotals(Port::Data data) override { totals = std::move(data); } + void consumeExtremes(Port::Data data) override { extremes = std::move(data); } void finalize() override { finished_processing = true; /// In case we are waiting for result. - queue.emplace(Chunk()); + queue.emplace(Port::Data{}); } private: - ConcurrentBoundedQueue queue; - Chunk totals; - Chunk extremes; + ConcurrentBoundedQueue queue; + Port::Data totals; + Port::Data extremes; /// Is not used. static WriteBuffer out; diff --git a/src/Processors/Port.h b/src/Processors/Port.h index ac71c394518..c7401a18afe 100644 --- a/src/Processors/Port.h +++ b/src/Processors/Port.h @@ -60,6 +60,14 @@ protected: /// Note: std::variant can be used. But move constructor for it can't be inlined. Chunk chunk; std::exception_ptr exception; + + Chunk getChunkOrTrow() + { + if (exception) + std::rethrow_exception(std::move(exception)); + + return std::move(chunk); + } }; private: @@ -303,12 +311,7 @@ public: Chunk ALWAYS_INLINE pull(bool set_not_needed = false) { - auto data_ = pullData(set_not_needed); - - if (data_.exception) - std::rethrow_exception(data_.exception); - - return std::move(data_.chunk); + return pullData(set_not_needed).getChunkOrTrow(); } bool ALWAYS_INLINE isFinished() const