diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index e27a8bd414b..0041a0f0846 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -604,9 +605,15 @@ BlockIO InterpreterInsertQuery::execute() { bool table_prefers_large_blocks = table->prefersLargeBlocks(); + pipeline.addTransform(std::make_shared( + header, + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL, + settings.max_memory_usage, presink_chains.size())); + pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr { - return std::make_shared( + return std::make_shared( in_header, table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); @@ -668,6 +675,14 @@ BlockIO InterpreterInsertQuery::execute() table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL); chain.addSource(std::move(squashing)); + + // auto balancing = std::make_shared( + // chain.getInputHeader(), + // table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + // table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL, + // settings.max_memory_usage, true); + + // chain.addSource(std::move(balancing)); } auto context_ptr = getContext(); diff --git a/src/Interpreters/SquashingTransform.cpp b/src/Interpreters/SquashingTransform.cpp index 4ed0dddc191..0d976bd967a 100644 --- a/src/Interpreters/SquashingTransform.cpp +++ b/src/Interpreters/SquashingTransform.cpp @@ -1,4 +1,12 @@ +#include +#include #include +#include "DataTypes/Serializations/ISerialization.h" +#include "Processors/Chunk.h" +#include "base/sleep.h" +#include "base/types.h" +#include +#include namespace DB @@ -126,4 +134,190 @@ bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const || (min_block_size_bytes && bytes >= min_block_size_bytes); } + +NewSquashingTransform::NewSquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_) + : min_block_size_rows(min_block_size_rows_) + , min_block_size_bytes(min_block_size_bytes_) +{ +} + +Block NewSquashingTransform::add(Chunk && input_chunk) +{ + return addImpl(std::move(input_chunk)); +} + +const ChunksToSquash * getInfoFromChunk(const Chunk & chunk) +{ + auto info = chunk.getChunkInfo(); + const auto * agg_info = typeid_cast(info.get()); + + return agg_info; +} + +template +Block NewSquashingTransform::addImpl(ReferenceType input_chunk) +{ + if (!input_chunk.hasChunkInfo()) + { + Block to_return; + std::swap(to_return, accumulated_block); + return to_return; + } + + const auto *info = getInfoFromChunk(input_chunk); + for (auto & one : info->chunks) + { + append(std::move(one), info->data_type); + } + + // if (isEnoughSize(accumulated_block)) + { + Block to_return; + std::swap(to_return, accumulated_block); + return to_return; + } +} + +template +void NewSquashingTransform::append(ReferenceType input_chunk, DataTypePtr data_type) +{ + if (input_chunk.getNumColumns() == 0) + return; + if (!accumulated_block) + { + for (const ColumnPtr& column : input_chunk.getColumns()) + { + ColumnWithTypeAndName col = ColumnWithTypeAndName(column, data_type, " "); + accumulated_block.insert(accumulated_block.columns(), col); + } + return; + } + + for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i) + { + const auto source_column = input_chunk.getColumns()[i]; + + auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column)); + mutable_column->insertRangeFrom(*source_column, 0, source_column->size()); + accumulated_block.getByPosition(i).column = std::move(mutable_column); + } +} + + + +BalanceTransform::BalanceTransform(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_) + : min_block_size_rows(min_block_size_rows_) + , min_block_size_bytes(min_block_size_bytes_) + , header(std::move(header_)) +{ + // Use query-level memory tracker + if (auto * memory_tracker_child = CurrentThread::getMemoryTracker()) + memory_tracker = memory_tracker_child->getParent(); +} + +Chunk BalanceTransform::add(Block && input_block) +{ + return addImpl(std::move(input_block)); +} + +Chunk BalanceTransform::convertToChunk(std::vector &chunks) +{ + if (chunks.empty()) + return {}; + + auto info = std::make_shared(); + for (auto &chunk : chunks) + info->chunks.push_back(chunk.clone()); + info->data_type = data_type; + + if (!info->chunks.empty()) /// Note: This if is only for debugging, structure of chunk copies the structure of info + { /// it's possible to use only 'Chunk(header.cloneEmptyColumns(), 0, info)' + return Chunk({info->chunks[0].getColumns(), info->chunks[0].getNumRows(), info}); + } + + return Chunk(header.cloneEmptyColumns(), 0, info); +} + + +template +Chunk BalanceTransform::addImpl(ReferenceType input_block) +{ + Chunk input_chunk(input_block.getColumns(), input_block.rows()); + if (!data_type && !input_block.getDataTypes().empty()) + data_type = input_block.getDataTypes()[0]; + // /// End of input stream. + if (!input_chunk) + { + Chunk res_chunk = convertToChunk(chunks_to_merge_vec); + // // std::cerr << "end of stream. Adding info to chunk " << std::endl; + return res_chunk; + } + + if (isEnoughSize(chunks_to_merge_vec)) + chunks_to_merge_vec.clear(); + + if (input_chunk) + chunks_to_merge_vec.push_back(input_chunk.clone()); + // std::cerr << "pushing back data. size: " << chunks_to_merge_vec.size() << std::endl; + + if (isEnoughSize(chunks_to_merge_vec)) + { + // // // std::cerr << "enough size" << std::endl; + Chunk res_chunk = convertToChunk(chunks_to_merge_vec); + return res_chunk; + } + return input_chunk; +} + +bool BalanceTransform::isEnoughSize(const std::vector & chunks) +{ + size_t rows = 0; + size_t bytes = 0; + + for (const Chunk & chunk : chunks) + { + rows += chunk.getNumRows(); + bytes += chunk.bytes(); + } + auto free_memory = memory_tracker->getHardLimit() - memory_tracker->get(); + std::cerr << "========Just memory representation, free memory: " << free_memory << ", chunk size: " << bytes << std::endl + << " hardLimit: " << memory_tracker->getHardLimit() << " get(): " << memory_tracker->get() << std::endl; + checkAndWaitMemoryAvailability(bytes); + + free_memory = memory_tracker->getHardLimit() - memory_tracker->get(); + std::cerr << "========Just memory representation after, free memory: " << free_memory << ", chunk size: " << bytes << std::endl + << ", hardLimit: " << memory_tracker->getHardLimit() << ", get(): " << memory_tracker->get() << std::endl; + + return isEnoughSize(rows, bytes); +} + +void BalanceTransform::checkAndWaitMemoryAvailability(size_t bytes) +{ + // bytes_used += bytes; + if (const auto hard_limit = memory_tracker->getHardLimit() != 0) + { + auto free_memory = hard_limit - memory_tracker->get(); + while (Int64(bytes) >= free_memory) + { + // std::cerr << "========Waiting a while from memory, free memory: " << free_memory << ", chunk size: " << bytes << std::endl; + // sleepForMilliseconds(10); + // checkAndWaitMemoryAvailability(bytes); + free_memory = hard_limit - memory_tracker->get(); + } + } +} + +bool BalanceTransform::isEnoughSize(const Chunk & chunk) +{ + return isEnoughSize(chunk.getNumRows(), chunk.bytes()); +} + + +bool BalanceTransform::isEnoughSize(size_t rows, size_t bytes) const +{ + return (!min_block_size_rows && !min_block_size_bytes) + || (min_block_size_rows && rows >= min_block_size_rows) + || (min_block_size_bytes && bytes >= min_block_size_bytes); +} + } diff --git a/src/Interpreters/SquashingTransform.h b/src/Interpreters/SquashingTransform.h index b04d012bcd1..0c2fe1ef12b 100644 --- a/src/Interpreters/SquashingTransform.h +++ b/src/Interpreters/SquashingTransform.h @@ -1,11 +1,22 @@ #pragma once +#include +#include +#include #include +#include +#include "Common/MemoryTracker.h" +#include "DataTypes/Serializations/ISerialization.h" namespace DB { +struct ChunksToSquash : public ChunkInfo +{ + mutable std::vector chunks = {}; + DataTypePtr data_type = nullptr; +}; /** Merging consecutive passed blocks to specified minimum size. * @@ -47,4 +58,56 @@ private: bool isEnoughSize(size_t rows, size_t bytes) const; }; +class NewSquashingTransform +{ +public: + NewSquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_); + + Block add(Chunk && input_chunk); + +private: + size_t min_block_size_rows; + size_t min_block_size_bytes; + + Block accumulated_block; + + template + Block addImpl(ReferenceType chunk); + + template + void append(ReferenceType input_chunk, DataTypePtr data_type); + + bool isEnoughSize(const Block & block); + bool isEnoughSize(size_t rows, size_t bytes) const; +}; + +class BalanceTransform +{ +public: + BalanceTransform(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_); + + Chunk add(Block && input_block); + +private: + std::vector chunks_to_merge_vec = {}; + size_t min_block_size_rows; + size_t min_block_size_bytes; + + Chunk accumulated_block; + const Block header; + + template + Chunk addImpl(ReferenceType input_block); + + bool isEnoughSize(const Chunk & chunk); + bool isEnoughSize(const std::vector & chunks); + bool isEnoughSize(size_t rows, size_t bytes) const; + void checkAndWaitMemoryAvailability(size_t bytes); + DataTypePtr data_type = nullptr; + + MemoryTracker * memory_tracker; + + Chunk convertToChunk(std::vector &chunks); +}; + } diff --git a/src/Processors/Transforms/BalancingTransform.cpp b/src/Processors/Transforms/BalancingTransform.cpp new file mode 100644 index 00000000000..b899702561e --- /dev/null +++ b/src/Processors/Transforms/BalancingTransform.cpp @@ -0,0 +1,223 @@ +#include +#include +#include "Common/Logger.h" +#include "Common/logger_useful.h" +#include "Interpreters/SquashingTransform.h" +#include "Processors/Chunk.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int MEMORY_LIMIT_EXCEEDED; +} + +LBalancingChunksTransform::LBalancingChunksTransform( + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, [[maybe_unused]] bool skip_empty_chunks_) + : ISimpleTransform(header, header, false), max_memory_usage(max_memory_usage_), squashing(min_block_size_rows, min_block_size_bytes), balance(header, min_block_size_rows, min_block_size_bytes) +{ +} + +void LBalancingChunksTransform::transform(Chunk & chunk) +{ + if (!finished) + { + Chunk res_chunk = balance.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())); + if (res_chunk.hasChunkInfo()) + { + // std::cerr << "BalancingTransform: adding chunk " << std::endl; + + // { + // [[maybe_unused]]const auto * agg_info = typeid_cast(res_chunk.getChunkInfo().get()); + // std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl; + // if (!agg_info->chunks.empty()) + // std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl; + // } + + } + else + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk, not finished.", reinterpret_cast(this));/// ISSUE: it's not clear why finished label is not set + std::swap(res_chunk, chunk); + } + else + { + Chunk res_chunk = balance.add({}); + if (res_chunk.hasChunkInfo()) + { + // std::cerr << "BalancingTransform: finished adding, NumRows:" << res_chunk.getNumRows() << ", HasInfo: " << res_chunk.hasChunkInfo() << std::endl; + // { + // [[maybe_unused]]const auto * agg_info = typeid_cast(res_chunk.getChunkInfo().get()); + // std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl; + // if (!agg_info->chunks.empty()) + // std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl; + // } + + } + else + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk on finished", reinterpret_cast(this)); + std::swap(res_chunk, chunk); + } + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: struct of output chunk: {}", reinterpret_cast(this), chunk.dumpStructure()); +} + +IProcessor::Status LBalancingChunksTransform::prepare() +{ + if (!finished && input.isFinished()) + { + finished = true; + return Status::Ready; + } + return ISimpleTransform::prepare(); +} + + +BalancingChunksTransform::BalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, size_t num_ports) + : IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), max_memory_usage(max_memory_usage_), squashing(min_block_size_rows, min_block_size_bytes), balance(header, min_block_size_rows, min_block_size_bytes) +{ +} + +IProcessor::Status BalancingChunksTransform::prepare() +{ + Status status = Status::Ready; + + while (status == Status::Ready) + { + status = !has_data ? prepareConsume() + : prepareSend(); + } + + return status; +} + +IProcessor::Status BalancingChunksTransform::prepareConsume() +{ + LOG_TRACE(getLogger("balancingProcessor"), "prepareConsume"); + for (auto & input : inputs) + { + bool all_finished = true; + for (auto & output : outputs) + { + if (output.isFinished()) + continue; + + all_finished = false; + } + + if (all_finished) + { + input.close(); + return Status::Finished; + } + + if (input.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(); + was_output_processed.assign(outputs.size(), false); + transform(chunk); + if (chunk.hasChunkInfo()) + { + LOG_TRACE(getLogger("balancingProcessor"), "hasData"); + has_data = true; + } + else + { + finished = true; + LOG_TRACE(getLogger("balancingProcessor"), "hasData, finished"); + transform(chunk); + has_data = true; + } + } + return Status::Ready; +} + +void BalancingChunksTransform::transform(Chunk & chunk_) +{ + if (!finished) + { + Chunk res_chunk = balance.add(getInputPorts().front().getHeader().cloneWithColumns(chunk_.detachColumns())); + if (res_chunk.hasChunkInfo()) + { + // std::cerr << "BalancingTransform: adding chunk " << std::endl; + + // { + // [[maybe_unused]]const auto * agg_info = typeid_cast(res_chunk.getChunkInfo().get()); + // std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl; + // if (!agg_info->chunks.empty()) + // std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl; + // } + } + else + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk, not finished.", reinterpret_cast(this));/// ISSUE: it's not clear why finished label is not set + std::swap(res_chunk, chunk_); + } + else + { + Chunk res_chunk = balance.add({}); + if (res_chunk.hasChunkInfo()) + { + // std::cerr << "BalancingTransform: finished adding, NumRows:" << res_chunk.getNumRows() << ", HasInfo: " << res_chunk.hasChunkInfo() << std::endl; + // { + // [[maybe_unused]]const auto * agg_info = typeid_cast(res_chunk.getChunkInfo().get()); + // std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl; + // if (!agg_info->chunks.empty()) + // std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl; + // } + } + else + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk on finished", reinterpret_cast(this)); + std::swap(res_chunk, chunk_); + } + LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: struct of output chunk: {}, hasInfo: {}", reinterpret_cast(this), chunk_.dumpStructure(), chunk.hasChunkInfo()); +} + +IProcessor::Status BalancingChunksTransform::prepareSend() +{ + LOG_TRACE(getLogger("balancingProcessor"), "prepareGenerate {}", chunk.dumpStructure()); + bool all_outputs_processed = true; + + size_t chunk_number = 0; + for (auto &output : outputs) + { + auto & was_processed = was_output_processed[chunk_number]; + ++chunk_number; + + if (!chunk.hasChunkInfo()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk info must be not empty in prepareGenerate()"); + + if (was_processed) + continue; + + if (output.isFinished()) + continue; + + if (!output.canPush()) + { + all_outputs_processed = false; + continue; + } + + LOG_TRACE(getLogger("balancingProcessor"), "chunk struct: {}", chunk.dumpStructure()); + output.push(chunk.clone()); + was_processed = true; + } + + if (all_outputs_processed) + { + has_data = false; + return Status::Ready; + } + + return Status::PortFull; +} +} diff --git a/src/Processors/Transforms/BalancingTransform.h b/src/Processors/Transforms/BalancingTransform.h new file mode 100644 index 00000000000..d992a14cdd4 --- /dev/null +++ b/src/Processors/Transforms/BalancingTransform.h @@ -0,0 +1,128 @@ +#pragma once + +#include +#include +#include "Processors/Chunk.h" +#include "Processors/IProcessor.h" +#include "Processors/Transforms/ExceptionKeepingTransform.h" +#include + +namespace DB +{ + +class BalancingTransform : public ExceptionKeepingTransform +{ +public: + explicit BalancingTransform( + const Block & header, size_t max_memory_usage_); + + String getName() const override { return "BalancingTransform"; } + + void work() override; + + const Chunks & getChunks() const + { + return chunks; + } + +protected: + void onConsume(Chunk chunk) override; + GenerateResult onGenerate() override; + void onFinish() override; + +private: + size_t CalculateBlockSize(const Block & block); + Chunks chunks; + Blocks blocks; + size_t blocks_size; + Chunk cur_chunk; + Chunk finish_chunk; + size_t max_memory_usage; +}; + +/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port. + + +/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port. +class LBalancingChunksTransform : public ISimpleTransform +{ +public: + explicit LBalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage, bool skip_empty_chunks_); + + String getName() const override { return "LBalancingChunksTransform"; } + + const Chunks & getChunks() const + { + return chunks; + } + +protected: + void transform(Chunk &) override; + + IProcessor::Status prepare() override; + +private: + size_t CalculateBlockSize(const Block & block); + [[maybe_unused]] ChunksToSquash chunks_to_merge; + Chunks chunks; + Blocks blocks; + [[maybe_unused]] size_t blocks_size; + Chunk cur_chunk; + Chunk finish_chunk; + [[maybe_unused]] size_t max_memory_usage; + SquashingTransform squashing; + BalanceTransform balance; + [[maybe_unused]]size_t acc_size = 0; + + /// When consumption is finished we need to release the final chunk regardless of its size. + bool finished = false; +}; + +class BalancingChunksTransform : public IProcessor +{ +public: + BalancingChunksTransform( + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, size_t num_ports); + // explicit BalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage, bool skip_empty_chunks_); + + String getName() const override { return "BalancingChunksTransform"; } + + const Chunks & getChunks() const + { + return chunks; + } + + InputPorts & getInputPorts() { return inputs; } + OutputPorts & getOutputPorts() { return outputs; } + + Status prepare() override; + Status prepareConsume(); + Status prepareSend(); + + // void work() override; + void transform(Chunk & chunk); + +protected: + // void transform(Chunk &) ; + +private: + size_t CalculateBlockSize(const Block & block); + [[maybe_unused]] ChunksToSquash chunks_to_merge; + Chunks chunks; + Chunk chunk; + Blocks blocks; + [[maybe_unused]] size_t blocks_size; + Chunk cur_chunk; + Chunk finish_chunk; + [[maybe_unused]] size_t max_memory_usage; + SquashingTransform squashing; + BalanceTransform balance; + [[maybe_unused]]size_t acc_size = 0; + bool has_data = false; + std::vector was_output_processed; + + /// When consumption is finished we need to release the final chunk regardless of its size. + bool finished = false; +}; +} + diff --git a/src/Processors/Transforms/SquashingChunksTransform.cpp b/src/Processors/Transforms/SquashingChunksTransform.cpp index 7de9538e435..22ce3ba9359 100644 --- a/src/Processors/Transforms/SquashingChunksTransform.cpp +++ b/src/Processors/Transforms/SquashingChunksTransform.cpp @@ -1,4 +1,6 @@ #include +#include +#include "Common/logger_useful.h" namespace DB { @@ -12,7 +14,8 @@ SquashingChunksTransform::SquashingChunksTransform( void SquashingChunksTransform::onConsume(Chunk chunk) { - if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast(this), chunk.hasChunkInfo()); + if (auto block = squashing.add(std::move(chunk))) { cur_chunk.setColumns(block.getColumns(), block.rows()); } @@ -29,7 +32,9 @@ SquashingChunksTransform::GenerateResult SquashingChunksTransform::onGenerate() void SquashingChunksTransform::onFinish() { auto block = squashing.add({}); + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, structure of block: {}", reinterpret_cast(this), block.dumpStructure()); finish_chunk.setColumns(block.getColumns(), block.rows()); + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast(this), finish_chunk.hasChunkInfo()); } void SquashingChunksTransform::work() @@ -50,8 +55,8 @@ void SquashingChunksTransform::work() } SimpleSquashingChunksTransform::SimpleSquashingChunksTransform( - const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes) - : ISimpleTransform(header, header, true), squashing(min_block_size_rows, min_block_size_bytes) + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, [[maybe_unused]] bool skip_empty_chunks_) + : ISimpleTransform(header, header, false), squashing(min_block_size_rows, min_block_size_bytes) { } @@ -59,11 +64,13 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk) { if (!finished) { - if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()))) + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast(this), chunk.hasChunkInfo()); + if (auto block = squashing.add(std::move(chunk))) chunk.setColumns(block.getColumns(), block.rows()); } else { + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast(this), chunk.hasChunkInfo()); auto block = squashing.add({}); chunk.setColumns(block.getColumns(), block.rows()); } @@ -79,4 +86,125 @@ IProcessor::Status SimpleSquashingChunksTransform::prepare() return ISimpleTransform::prepare(); } +//maybe it makes sense to pass not the IProcessor entity, but the SimpleTransform? anyway we have one input and one output +ProcessorSquashingTransform::ProcessorSquashingTransform( + const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, [[maybe_unused]]size_t num_ports) + : IProcessor(InputPorts(1, header), OutputPorts(1, header)), squashing(min_block_size_rows, min_block_size_bytes) +{ +} + +IProcessor::Status ProcessorSquashingTransform::prepare() +{ + Status status = Status::Ready; + + while (status == Status::Ready) + { + status = !has_data ? prepareConsume() + : prepareGenerate(); + } + + return status; +} + +IProcessor::Status ProcessorSquashingTransform::prepareConsume() +{ + LOG_TRACE(getLogger("balancing"), "prepareConsume"); + for (auto & input : getInputPorts()) + { + bool all_finished = true; + for (auto & output : outputs) + { + if (output.isFinished()) + continue; + + all_finished = false; + } + + if (all_finished) + { + input.close(); + return Status::Finished; + } + + if (input.isFinished()) + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + + chunk = input.pull(); + has_data = true; + was_output_processed.assign(outputs.size(), false); + transform(chunk); + // if (chunk) + // chunks.push_back(std::move(chunk)); + } + return Status::Ready; +} + +void ProcessorSquashingTransform::transform(Chunk & chunk_) +{ + // [[maybe_unused]]const auto * agg_info = typeid_cast(chunk.getChunkInfo().get()); + // if (agg_info) + // { + // std::cerr << "Beginning of SquashingTransform: size of one group: " << agg_info->chunks.size() << std::endl; + // if (!agg_info->chunks.empty()) + // std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl; + // } + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: Struct of input chunk: {}", reinterpret_cast(this), chunk_.dumpStructure()); + if (!finished) + { + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast(this), chunk_.hasChunkInfo()); + if (auto block = squashing.add(std::move(chunk_))) + chunk_.setColumns(block.getColumns(), block.rows()); + } + else + { + LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast(this), chunk_.hasChunkInfo()); + auto block = squashing.add({}); + chunk_.setColumns(block.getColumns(), block.rows()); + } +} + +IProcessor::Status ProcessorSquashingTransform::prepareGenerate() +{ + LOG_TRACE(getLogger("squashingProcessor"), "prepareGenerate"); + bool all_outputs_processed = true; + + size_t chunk_number = 0; + for (auto &output : getOutputPorts()) + { + auto & was_processed = was_output_processed[chunk_number]; + ++chunk_number; + + if (was_processed) + continue; + + if (output.isFinished()) + continue; + + if (!output.canPush()) + { + all_outputs_processed = false; + continue; + } + + LOG_TRACE(getLogger("squashingProcessor"), "chunk struct: {}", chunk.dumpStructure()); + output.push(chunk.clone()); + was_processed = true; + } + + if (all_outputs_processed) + { + has_data = false; + return Status::Ready; + } + return Status::PortFull; +} } diff --git a/src/Processors/Transforms/SquashingChunksTransform.h b/src/Processors/Transforms/SquashingChunksTransform.h index f82e9e46a61..f140f5274d7 100644 --- a/src/Processors/Transforms/SquashingChunksTransform.h +++ b/src/Processors/Transforms/SquashingChunksTransform.h @@ -23,7 +23,7 @@ protected: void onFinish() override; private: - SquashingTransform squashing; + NewSquashingTransform squashing; Chunk cur_chunk; Chunk finish_chunk; }; @@ -32,7 +32,7 @@ private: class SimpleSquashingChunksTransform : public ISimpleTransform { public: - explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes); + explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool skip_empty_chunks_ = true); String getName() const override { return "SimpleSquashingTransform"; } @@ -42,7 +42,35 @@ protected: IProcessor::Status prepare() override; private: - SquashingTransform squashing; + NewSquashingTransform squashing; + + bool finished = false; +}; + + +class ProcessorSquashingTransform : public IProcessor +{ +public: + explicit ProcessorSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports); + + String getName() const override { return "ProcessorSquashingTransform"; } + +protected: + InputPorts & getInputPorts() { return inputs; } + OutputPorts & getOutputPorts() { return outputs; } + + Status prepare() override; + Status prepareConsume(); + Status prepareGenerate(); + + // void work() override; + void transform(Chunk & chunk); + +private: + NewSquashingTransform squashing; + Chunk chunk; + bool has_data = false; + std::vector was_output_processed; /// When consumption is finished we need to release the final chunk regardless of its size. bool finished = false;