This commit is contained in:
yariks5s 2024-03-07 13:11:42 +00:00
parent a28385966c
commit 3c1e6b5d2c
7 changed files with 787 additions and 8 deletions

View File

@ -26,6 +26,7 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/BalancingTransform.h>
#include <Processors/Transforms/getSourceFromASTInsertQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -604,9 +605,15 @@ BlockIO InterpreterInsertQuery::execute()
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
pipeline.addTransform(std::make_shared<BalancingChunksTransform>(
header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
settings.max_memory_usage, presink_chains.size()));
pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
return std::make_shared<SimpleSquashingChunksTransform>(
return std::make_shared<SquashingChunksTransform>(
in_header,
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
@ -668,6 +675,14 @@ BlockIO InterpreterInsertQuery::execute()
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL);
chain.addSource(std::move(squashing));
// auto balancing = std::make_shared<LBalancingChunksTransform>(
// chain.getInputHeader(),
// table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
// table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL,
// settings.max_memory_usage, true);
// chain.addSource(std::move(balancing));
}
auto context_ptr = getContext();

View File

@ -1,4 +1,12 @@
#include <memory>
#include <vector>
#include <Interpreters/SquashingTransform.h>
#include "DataTypes/Serializations/ISerialization.h"
#include "Processors/Chunk.h"
#include "base/sleep.h"
#include "base/types.h"
#include <Columns/ColumnsNumber.h>
#include <Common/CurrentThread.h>
namespace DB
@ -126,4 +134,190 @@ bool SquashingTransform::isEnoughSize(size_t rows, size_t bytes) const
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
NewSquashingTransform::NewSquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
{
}
Block NewSquashingTransform::add(Chunk && input_chunk)
{
return addImpl<Chunk &&>(std::move(input_chunk));
}
const ChunksToSquash * getInfoFromChunk(const Chunk & chunk)
{
auto info = chunk.getChunkInfo();
const auto * agg_info = typeid_cast<const ChunksToSquash *>(info.get());
return agg_info;
}
template <typename ReferenceType>
Block NewSquashingTransform::addImpl(ReferenceType input_chunk)
{
if (!input_chunk.hasChunkInfo())
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
const auto *info = getInfoFromChunk(input_chunk);
for (auto & one : info->chunks)
{
append(std::move(one), info->data_type);
}
// if (isEnoughSize(accumulated_block))
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
}
template <typename ReferenceType>
void NewSquashingTransform::append(ReferenceType input_chunk, DataTypePtr data_type)
{
if (input_chunk.getNumColumns() == 0)
return;
if (!accumulated_block)
{
for (const ColumnPtr& column : input_chunk.getColumns())
{
ColumnWithTypeAndName col = ColumnWithTypeAndName(column, data_type, " ");
accumulated_block.insert(accumulated_block.columns(), col);
}
return;
}
for (size_t i = 0, size = accumulated_block.columns(); i < size; ++i)
{
const auto source_column = input_chunk.getColumns()[i];
auto mutable_column = IColumn::mutate(std::move(accumulated_block.getByPosition(i).column));
mutable_column->insertRangeFrom(*source_column, 0, source_column->size());
accumulated_block.getByPosition(i).column = std::move(mutable_column);
}
}
BalanceTransform::BalanceTransform(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_)
: min_block_size_rows(min_block_size_rows_)
, min_block_size_bytes(min_block_size_bytes_)
, header(std::move(header_))
{
// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
memory_tracker = memory_tracker_child->getParent();
}
Chunk BalanceTransform::add(Block && input_block)
{
return addImpl<Block &&>(std::move(input_block));
}
Chunk BalanceTransform::convertToChunk(std::vector<Chunk> &chunks)
{
if (chunks.empty())
return {};
auto info = std::make_shared<ChunksToSquash>();
for (auto &chunk : chunks)
info->chunks.push_back(chunk.clone());
info->data_type = data_type;
if (!info->chunks.empty()) /// Note: This if is only for debugging, structure of chunk copies the structure of info
{ /// it's possible to use only 'Chunk(header.cloneEmptyColumns(), 0, info)'
return Chunk({info->chunks[0].getColumns(), info->chunks[0].getNumRows(), info});
}
return Chunk(header.cloneEmptyColumns(), 0, info);
}
template <typename ReferenceType>
Chunk BalanceTransform::addImpl(ReferenceType input_block)
{
Chunk input_chunk(input_block.getColumns(), input_block.rows());
if (!data_type && !input_block.getDataTypes().empty())
data_type = input_block.getDataTypes()[0];
// /// End of input stream.
if (!input_chunk)
{
Chunk res_chunk = convertToChunk(chunks_to_merge_vec);
// // std::cerr << "end of stream. Adding info to chunk " << std::endl;
return res_chunk;
}
if (isEnoughSize(chunks_to_merge_vec))
chunks_to_merge_vec.clear();
if (input_chunk)
chunks_to_merge_vec.push_back(input_chunk.clone());
// std::cerr << "pushing back data. size: " << chunks_to_merge_vec.size() << std::endl;
if (isEnoughSize(chunks_to_merge_vec))
{
// // // std::cerr << "enough size" << std::endl;
Chunk res_chunk = convertToChunk(chunks_to_merge_vec);
return res_chunk;
}
return input_chunk;
}
bool BalanceTransform::isEnoughSize(const std::vector<Chunk> & chunks)
{
size_t rows = 0;
size_t bytes = 0;
for (const Chunk & chunk : chunks)
{
rows += chunk.getNumRows();
bytes += chunk.bytes();
}
auto free_memory = memory_tracker->getHardLimit() - memory_tracker->get();
std::cerr << "========Just memory representation, free memory: " << free_memory << ", chunk size: " << bytes << std::endl
<< " hardLimit: " << memory_tracker->getHardLimit() << " get(): " << memory_tracker->get() << std::endl;
checkAndWaitMemoryAvailability(bytes);
free_memory = memory_tracker->getHardLimit() - memory_tracker->get();
std::cerr << "========Just memory representation after, free memory: " << free_memory << ", chunk size: " << bytes << std::endl
<< ", hardLimit: " << memory_tracker->getHardLimit() << ", get(): " << memory_tracker->get() << std::endl;
return isEnoughSize(rows, bytes);
}
void BalanceTransform::checkAndWaitMemoryAvailability(size_t bytes)
{
// bytes_used += bytes;
if (const auto hard_limit = memory_tracker->getHardLimit() != 0)
{
auto free_memory = hard_limit - memory_tracker->get();
while (Int64(bytes) >= free_memory)
{
// std::cerr << "========Waiting a while from memory, free memory: " << free_memory << ", chunk size: " << bytes << std::endl;
// sleepForMilliseconds(10);
// checkAndWaitMemoryAvailability(bytes);
free_memory = hard_limit - memory_tracker->get();
}
}
}
bool BalanceTransform::isEnoughSize(const Chunk & chunk)
{
return isEnoughSize(chunk.getNumRows(), chunk.bytes());
}
bool BalanceTransform::isEnoughSize(size_t rows, size_t bytes) const
{
return (!min_block_size_rows && !min_block_size_bytes)
|| (min_block_size_rows && rows >= min_block_size_rows)
|| (min_block_size_bytes && bytes >= min_block_size_bytes);
}
}

View File

@ -1,11 +1,22 @@
#pragma once
#include <list>
#include <memory>
#include <vector>
#include <Core/Block.h>
#include <Processors/Chunk.h>
#include "Common/MemoryTracker.h"
#include "DataTypes/Serializations/ISerialization.h"
namespace DB
{
struct ChunksToSquash : public ChunkInfo
{
mutable std::vector<Chunk> chunks = {};
DataTypePtr data_type = nullptr;
};
/** Merging consecutive passed blocks to specified minimum size.
*
@ -47,4 +58,56 @@ private:
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class NewSquashingTransform
{
public:
NewSquashingTransform(size_t min_block_size_rows_, size_t min_block_size_bytes_);
Block add(Chunk && input_chunk);
private:
size_t min_block_size_rows;
size_t min_block_size_bytes;
Block accumulated_block;
template <typename ReferenceType>
Block addImpl(ReferenceType chunk);
template <typename ReferenceType>
void append(ReferenceType input_chunk, DataTypePtr data_type);
bool isEnoughSize(const Block & block);
bool isEnoughSize(size_t rows, size_t bytes) const;
};
class BalanceTransform
{
public:
BalanceTransform(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
Chunk add(Block && input_block);
private:
std::vector<Chunk> chunks_to_merge_vec = {};
size_t min_block_size_rows;
size_t min_block_size_bytes;
Chunk accumulated_block;
const Block header;
template <typename ReferenceType>
Chunk addImpl(ReferenceType input_block);
bool isEnoughSize(const Chunk & chunk);
bool isEnoughSize(const std::vector<Chunk> & chunks);
bool isEnoughSize(size_t rows, size_t bytes) const;
void checkAndWaitMemoryAvailability(size_t bytes);
DataTypePtr data_type = nullptr;
MemoryTracker * memory_tracker;
Chunk convertToChunk(std::vector<Chunk> &chunks);
};
}

View File

@ -0,0 +1,223 @@
#include <memory>
#include <Processors/Transforms/BalancingTransform.h>
#include "Common/Logger.h"
#include "Common/logger_useful.h"
#include "Interpreters/SquashingTransform.h"
#include "Processors/Chunk.h"
namespace DB
{
namespace ErrorCodes
{
extern const int MEMORY_LIMIT_EXCEEDED;
}
LBalancingChunksTransform::LBalancingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, [[maybe_unused]] bool skip_empty_chunks_)
: ISimpleTransform(header, header, false), max_memory_usage(max_memory_usage_), squashing(min_block_size_rows, min_block_size_bytes), balance(header, min_block_size_rows, min_block_size_bytes)
{
}
void LBalancingChunksTransform::transform(Chunk & chunk)
{
if (!finished)
{
Chunk res_chunk = balance.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
if (res_chunk.hasChunkInfo())
{
// std::cerr << "BalancingTransform: adding chunk " << std::endl;
// {
// [[maybe_unused]]const auto * agg_info = typeid_cast<const ChunksToSquash *>(res_chunk.getChunkInfo().get());
// std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl;
// if (!agg_info->chunks.empty())
// std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl;
// }
}
else
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk, not finished.", reinterpret_cast<void*>(this));/// ISSUE: it's not clear why finished label is not set
std::swap(res_chunk, chunk);
}
else
{
Chunk res_chunk = balance.add({});
if (res_chunk.hasChunkInfo())
{
// std::cerr << "BalancingTransform: finished adding, NumRows:" << res_chunk.getNumRows() << ", HasInfo: " << res_chunk.hasChunkInfo() << std::endl;
// {
// [[maybe_unused]]const auto * agg_info = typeid_cast<const ChunksToSquash *>(res_chunk.getChunkInfo().get());
// std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl;
// if (!agg_info->chunks.empty())
// std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl;
// }
}
else
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk on finished", reinterpret_cast<void*>(this));
std::swap(res_chunk, chunk);
}
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: struct of output chunk: {}", reinterpret_cast<void*>(this), chunk.dumpStructure());
}
IProcessor::Status LBalancingChunksTransform::prepare()
{
if (!finished && input.isFinished())
{
finished = true;
return Status::Ready;
}
return ISimpleTransform::prepare();
}
BalancingChunksTransform::BalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, size_t num_ports)
: IProcessor(InputPorts(num_ports, header), OutputPorts(num_ports, header)), max_memory_usage(max_memory_usage_), squashing(min_block_size_rows, min_block_size_bytes), balance(header, min_block_size_rows, min_block_size_bytes)
{
}
IProcessor::Status BalancingChunksTransform::prepare()
{
Status status = Status::Ready;
while (status == Status::Ready)
{
status = !has_data ? prepareConsume()
: prepareSend();
}
return status;
}
IProcessor::Status BalancingChunksTransform::prepareConsume()
{
LOG_TRACE(getLogger("balancingProcessor"), "prepareConsume");
for (auto & input : inputs)
{
bool all_finished = true;
for (auto & output : outputs)
{
if (output.isFinished())
continue;
all_finished = false;
}
if (all_finished)
{
input.close();
return Status::Finished;
}
if (input.isFinished())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
chunk = input.pull();
was_output_processed.assign(outputs.size(), false);
transform(chunk);
if (chunk.hasChunkInfo())
{
LOG_TRACE(getLogger("balancingProcessor"), "hasData");
has_data = true;
}
else
{
finished = true;
LOG_TRACE(getLogger("balancingProcessor"), "hasData, finished");
transform(chunk);
has_data = true;
}
}
return Status::Ready;
}
void BalancingChunksTransform::transform(Chunk & chunk_)
{
if (!finished)
{
Chunk res_chunk = balance.add(getInputPorts().front().getHeader().cloneWithColumns(chunk_.detachColumns()));
if (res_chunk.hasChunkInfo())
{
// std::cerr << "BalancingTransform: adding chunk " << std::endl;
// {
// [[maybe_unused]]const auto * agg_info = typeid_cast<const ChunksToSquash *>(res_chunk.getChunkInfo().get());
// std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl;
// if (!agg_info->chunks.empty())
// std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl;
// }
}
else
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk, not finished.", reinterpret_cast<void*>(this));/// ISSUE: it's not clear why finished label is not set
std::swap(res_chunk, chunk_);
}
else
{
Chunk res_chunk = balance.add({});
if (res_chunk.hasChunkInfo())
{
// std::cerr << "BalancingTransform: finished adding, NumRows:" << res_chunk.getNumRows() << ", HasInfo: " << res_chunk.hasChunkInfo() << std::endl;
// {
// [[maybe_unused]]const auto * agg_info = typeid_cast<const ChunksToSquash *>(res_chunk.getChunkInfo().get());
// std::cerr << "End of BalancingTransform: size of one group: " << agg_info->chunks.size() << std::endl;
// if (!agg_info->chunks.empty())
// std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl << std::endl;
// }
}
else
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: not adding chunk on finished", reinterpret_cast<void*>(this));
std::swap(res_chunk, chunk_);
}
LOG_TRACE(getLogger("balancing"), "{}, BalancingTransform: struct of output chunk: {}, hasInfo: {}", reinterpret_cast<void*>(this), chunk_.dumpStructure(), chunk.hasChunkInfo());
}
IProcessor::Status BalancingChunksTransform::prepareSend()
{
LOG_TRACE(getLogger("balancingProcessor"), "prepareGenerate {}", chunk.dumpStructure());
bool all_outputs_processed = true;
size_t chunk_number = 0;
for (auto &output : outputs)
{
auto & was_processed = was_output_processed[chunk_number];
++chunk_number;
if (!chunk.hasChunkInfo())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk info must be not empty in prepareGenerate()");
if (was_processed)
continue;
if (output.isFinished())
continue;
if (!output.canPush())
{
all_outputs_processed = false;
continue;
}
LOG_TRACE(getLogger("balancingProcessor"), "chunk struct: {}", chunk.dumpStructure());
output.push(chunk.clone());
was_processed = true;
}
if (all_outputs_processed)
{
has_data = false;
return Status::Ready;
}
return Status::PortFull;
}
}

View File

@ -0,0 +1,128 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include "Processors/Chunk.h"
#include "Processors/IProcessor.h"
#include "Processors/Transforms/ExceptionKeepingTransform.h"
#include <Interpreters/SquashingTransform.h>
namespace DB
{
class BalancingTransform : public ExceptionKeepingTransform
{
public:
explicit BalancingTransform(
const Block & header, size_t max_memory_usage_);
String getName() const override { return "BalancingTransform"; }
void work() override;
const Chunks & getChunks() const
{
return chunks;
}
protected:
void onConsume(Chunk chunk) override;
GenerateResult onGenerate() override;
void onFinish() override;
private:
size_t CalculateBlockSize(const Block & block);
Chunks chunks;
Blocks blocks;
size_t blocks_size;
Chunk cur_chunk;
Chunk finish_chunk;
size_t max_memory_usage;
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class LBalancingChunksTransform : public ISimpleTransform
{
public:
explicit LBalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage, bool skip_empty_chunks_);
String getName() const override { return "LBalancingChunksTransform"; }
const Chunks & getChunks() const
{
return chunks;
}
protected:
void transform(Chunk &) override;
IProcessor::Status prepare() override;
private:
size_t CalculateBlockSize(const Block & block);
[[maybe_unused]] ChunksToSquash chunks_to_merge;
Chunks chunks;
Blocks blocks;
[[maybe_unused]] size_t blocks_size;
Chunk cur_chunk;
Chunk finish_chunk;
[[maybe_unused]] size_t max_memory_usage;
SquashingTransform squashing;
BalanceTransform balance;
[[maybe_unused]]size_t acc_size = 0;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;
};
class BalancingChunksTransform : public IProcessor
{
public:
BalancingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage_, size_t num_ports);
// explicit BalancingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t max_memory_usage, bool skip_empty_chunks_);
String getName() const override { return "BalancingChunksTransform"; }
const Chunks & getChunks() const
{
return chunks;
}
InputPorts & getInputPorts() { return inputs; }
OutputPorts & getOutputPorts() { return outputs; }
Status prepare() override;
Status prepareConsume();
Status prepareSend();
// void work() override;
void transform(Chunk & chunk);
protected:
// void transform(Chunk &) ;
private:
size_t CalculateBlockSize(const Block & block);
[[maybe_unused]] ChunksToSquash chunks_to_merge;
Chunks chunks;
Chunk chunk;
Blocks blocks;
[[maybe_unused]] size_t blocks_size;
Chunk cur_chunk;
Chunk finish_chunk;
[[maybe_unused]] size_t max_memory_usage;
SquashingTransform squashing;
BalanceTransform balance;
[[maybe_unused]]size_t acc_size = 0;
bool has_data = false;
std::vector<char> was_output_processed;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;
};
}

View File

@ -1,4 +1,6 @@
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/IProcessor.h>
#include "Common/logger_useful.h"
namespace DB
{
@ -12,7 +14,8 @@ SquashingChunksTransform::SquashingChunksTransform(
void SquashingChunksTransform::onConsume(Chunk chunk)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast<void*>(this), chunk.hasChunkInfo());
if (auto block = squashing.add(std::move(chunk)))
{
cur_chunk.setColumns(block.getColumns(), block.rows());
}
@ -29,7 +32,9 @@ SquashingChunksTransform::GenerateResult SquashingChunksTransform::onGenerate()
void SquashingChunksTransform::onFinish()
{
auto block = squashing.add({});
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, structure of block: {}", reinterpret_cast<void*>(this), block.dumpStructure());
finish_chunk.setColumns(block.getColumns(), block.rows());
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast<void*>(this), finish_chunk.hasChunkInfo());
}
void SquashingChunksTransform::work()
@ -50,8 +55,8 @@ void SquashingChunksTransform::work()
}
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, true), squashing(min_block_size_rows, min_block_size_bytes)
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, [[maybe_unused]] bool skip_empty_chunks_)
: ISimpleTransform(header, header, false), squashing(min_block_size_rows, min_block_size_bytes)
{
}
@ -59,11 +64,13 @@ void SimpleSquashingChunksTransform::transform(Chunk & chunk)
{
if (!finished)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast<void*>(this), chunk.hasChunkInfo());
if (auto block = squashing.add(std::move(chunk)))
chunk.setColumns(block.getColumns(), block.rows());
}
else
{
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast<void*>(this), chunk.hasChunkInfo());
auto block = squashing.add({});
chunk.setColumns(block.getColumns(), block.rows());
}
@ -79,4 +86,125 @@ IProcessor::Status SimpleSquashingChunksTransform::prepare()
return ISimpleTransform::prepare();
}
//maybe it makes sense to pass not the IProcessor entity, but the SimpleTransform? anyway we have one input and one output
ProcessorSquashingTransform::ProcessorSquashingTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, [[maybe_unused]]size_t num_ports)
: IProcessor(InputPorts(1, header), OutputPorts(1, header)), squashing(min_block_size_rows, min_block_size_bytes)
{
}
IProcessor::Status ProcessorSquashingTransform::prepare()
{
Status status = Status::Ready;
while (status == Status::Ready)
{
status = !has_data ? prepareConsume()
: prepareGenerate();
}
return status;
}
IProcessor::Status ProcessorSquashingTransform::prepareConsume()
{
LOG_TRACE(getLogger("balancing"), "prepareConsume");
for (auto & input : getInputPorts())
{
bool all_finished = true;
for (auto & output : outputs)
{
if (output.isFinished())
continue;
all_finished = false;
}
if (all_finished)
{
input.close();
return Status::Finished;
}
if (input.isFinished())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
chunk = input.pull();
has_data = true;
was_output_processed.assign(outputs.size(), false);
transform(chunk);
// if (chunk)
// chunks.push_back(std::move(chunk));
}
return Status::Ready;
}
void ProcessorSquashingTransform::transform(Chunk & chunk_)
{
// [[maybe_unused]]const auto * agg_info = typeid_cast<const ChunksToSquash *>(chunk.getChunkInfo().get());
// if (agg_info)
// {
// std::cerr << "Beginning of SquashingTransform: size of one group: " << agg_info->chunks.size() << std::endl;
// if (!agg_info->chunks.empty())
// std::cerr << "!group is not empty, first column: " << agg_info->chunks[0].dumpStructure() << std::endl;
// }
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: Struct of input chunk: {}", reinterpret_cast<void*>(this), chunk_.dumpStructure());
if (!finished)
{
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: !finished, hasInfo: {}", reinterpret_cast<void*>(this), chunk_.hasChunkInfo());
if (auto block = squashing.add(std::move(chunk_)))
chunk_.setColumns(block.getColumns(), block.rows());
}
else
{
LOG_TRACE(getLogger("squashing"), "{}, SquashingTransform: finished, hasInfo: {}", reinterpret_cast<void*>(this), chunk_.hasChunkInfo());
auto block = squashing.add({});
chunk_.setColumns(block.getColumns(), block.rows());
}
}
IProcessor::Status ProcessorSquashingTransform::prepareGenerate()
{
LOG_TRACE(getLogger("squashingProcessor"), "prepareGenerate");
bool all_outputs_processed = true;
size_t chunk_number = 0;
for (auto &output : getOutputPorts())
{
auto & was_processed = was_output_processed[chunk_number];
++chunk_number;
if (was_processed)
continue;
if (output.isFinished())
continue;
if (!output.canPush())
{
all_outputs_processed = false;
continue;
}
LOG_TRACE(getLogger("squashingProcessor"), "chunk struct: {}", chunk.dumpStructure());
output.push(chunk.clone());
was_processed = true;
}
if (all_outputs_processed)
{
has_data = false;
return Status::Ready;
}
return Status::PortFull;
}
}

View File

@ -23,7 +23,7 @@ protected:
void onFinish() override;
private:
SquashingTransform squashing;
NewSquashingTransform squashing;
Chunk cur_chunk;
Chunk finish_chunk;
};
@ -32,7 +32,7 @@ private:
class SimpleSquashingChunksTransform : public ISimpleTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, bool skip_empty_chunks_ = true);
String getName() const override { return "SimpleSquashingTransform"; }
@ -42,7 +42,35 @@ protected:
IProcessor::Status prepare() override;
private:
SquashingTransform squashing;
NewSquashingTransform squashing;
bool finished = false;
};
class ProcessorSquashingTransform : public IProcessor
{
public:
explicit ProcessorSquashingTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes, size_t num_ports);
String getName() const override { return "ProcessorSquashingTransform"; }
protected:
InputPorts & getInputPorts() { return inputs; }
OutputPorts & getOutputPorts() { return outputs; }
Status prepare() override;
Status prepareConsume();
Status prepareGenerate();
// void work() override;
void transform(Chunk & chunk);
private:
NewSquashingTransform squashing;
Chunk chunk;
bool has_data = false;
std::vector<char> was_output_processed;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;