removed memory from automata, refactored the code

This commit is contained in:
yariks5s 2024-05-15 15:56:24 +00:00
parent a8a2aa21b2
commit 0619b0921f
4 changed files with 49 additions and 147 deletions

View File

@ -198,6 +198,11 @@ PlanSquashing::PlanSquashing(Block header_, size_t min_block_size_rows_, size_t
{
}
Chunk PlanSquashing::flush()
{
return convertToChunk(chunks_to_merge_vec);
}
Chunk PlanSquashing::add(Chunk && input_chunk)
{
return addImpl(std::move(input_chunk));
@ -206,10 +211,7 @@ Chunk PlanSquashing::add(Chunk && input_chunk)
Chunk PlanSquashing::addImpl(Chunk && input_chunk)
{
if (!input_chunk)
{
Chunk res_chunk = convertToChunk(chunks_to_merge_vec);
return res_chunk;
}
return {};
if (isEnoughSize(chunks_to_merge_vec))
chunks_to_merge_vec.clear();

View File

@ -85,6 +85,7 @@ public:
PlanSquashing(Block header_, size_t min_block_size_rows_, size_t min_block_size_bytes_);
Chunk add(Chunk && input_chunk);
Chunk flush();
bool isDataLeft()
{
return !chunks_to_merge_vec.empty();

View File

@ -1,5 +1,6 @@
#include <Processors/Transforms/PlanSquashingTransform.h>
#include <Processors/IProcessor.h>
#include "Common/logger_useful.h"
#include <Common/Exception.h>
namespace DB
@ -24,26 +25,18 @@ IProcessor::Status PlanSquashingTransform::prepare()
switch (planning_status)
{
case INIT:
{
status = init();
init();
break;
}
case READ_IF_CAN:
{
status = prepareConsume();
break;
}
case PUSH:
{
status = push();
break;
}
case WAIT_IN:
return waitForDataIn();
case WAIT_OUT:
return prepareSend();
case WAIT_OUT_FLUSH:
return prepareSendFlush();
planning_status = PlanningStatus::READ_IF_CAN;
return Status::NeedData;
case PUSH:
return sendOrFlush();
case FLUSH:
return sendOrFlush();
case FINISH:
break; /// never reached
}
@ -58,104 +51,58 @@ void PlanSquashingTransform::work()
prepare();
}
IProcessor::Status PlanSquashingTransform::init()
void PlanSquashingTransform::init()
{
for (auto input : inputs)
{
input.setNeeded();
if (input.hasData())
available_inputs++;
}
for (auto input: inputs)
if (!input.isFinished())
input.setNeeded();
planning_status = PlanningStatus::READ_IF_CAN;
return Status::Ready;
}
IProcessor::Status PlanSquashingTransform::prepareConsume()
{
if (available_inputs == 0)
{
planning_status = PlanningStatus::WAIT_IN;
return Status::NeedData;
}
finished = false;
bool inputs_have_no_data = true;
bool inputs_have_no_data = true, all_finished = true;
for (auto & input : inputs)
{
if (!input.isFinished())
all_finished = false;
if (input.hasData())
{
inputs_have_no_data = false;
chunk = input.pull();
transform(chunk);
available_inputs--;
if (chunk.hasChunkInfo())
{
planning_status = PlanningStatus::WAIT_OUT;
planning_status = PlanningStatus::PUSH;
return Status::Ready;
}
}
if (available_inputs == 0)
{
planning_status = PlanningStatus::WAIT_IN;
return Status::NeedData;
}
}
if (inputs_have_no_data)
{
if (checkInputs())
return Status::Ready;
planning_status = PlanningStatus::WAIT_IN;
return Status::NeedData;
}
return Status::Ready;
}
bool PlanSquashingTransform::checkInputs()
{
bool all_finished = true;
for (auto & input : inputs)
if (!input.isFinished())
all_finished = false;
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (balance.isDataLeft()) /// If we have data in balancing, we process this data
{
planning_status = PlanningStatus::WAIT_OUT_FLUSH;
finished = true;
transform(chunk);
planning_status = PlanningStatus::FLUSH;
flushChunk();
}
// else /// If we don't have data, We send FINISHED
// planning_status = PlanningStatus::FINISH;
return true;
planning_status = PlanningStatus::PUSH;
return Status::Ready;
}
return false;
}
bool PlanSquashingTransform::checkOutputs()
{
bool all_finished = true;
if (inputs_have_no_data)
planning_status = PlanningStatus::WAIT_IN;
for (auto & output : outputs)
if (!output.isFinished())
all_finished = false;
if (all_finished) /// If all outputs are closed, we close inputs (just in case)
{
planning_status = PlanningStatus::FINISH;
return true;
}
return false;
return Status::Ready;
}
IProcessor::Status PlanSquashingTransform::waitForDataIn()
{
bool all_finished = true;
bool inputs_have_no_data = true;
for (auto & input : inputs)
{
if (input.isFinished())
@ -163,18 +110,17 @@ IProcessor::Status PlanSquashingTransform::waitForDataIn()
all_finished = false;
if (!input.hasData())
continue;
if (input.hasData())
inputs_have_no_data = false;
available_inputs++;
}
if (all_finished)
{
checkInputs();
planning_status = PlanningStatus::READ_IF_CAN;
return Status::Ready;
}
if (available_inputs > 0)
if (!inputs_have_no_data)
{
planning_status = PlanningStatus::READ_IF_CAN;
return Status::Ready;
@ -185,34 +131,17 @@ IProcessor::Status PlanSquashingTransform::waitForDataIn()
void PlanSquashingTransform::transform(Chunk & chunk_)
{
if (!finished)
{
Chunk res_chunk = balance.add(std::move(chunk_));
std::swap(res_chunk, chunk_);
}
else
{
Chunk res_chunk = balance.add({});
std::swap(res_chunk, chunk_);
}
Chunk res_chunk = balance.add(std::move(chunk_));
std::swap(res_chunk, chunk_);
}
IProcessor::Status PlanSquashingTransform::push()
void PlanSquashingTransform::flushChunk()
{
if (!free_output)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There should be a free output in push()");
if (finished)
planning_status = PlanningStatus::FINISH;
else
planning_status = PlanningStatus::READ_IF_CAN;
free_output->push(std::move(chunk));
free_output = nullptr;
return Status::Ready;
Chunk res_chunk = balance.flush();
std::swap(res_chunk, chunk);
}
IProcessor::Status PlanSquashingTransform::prepareSend()
IProcessor::Status PlanSquashingTransform::sendOrFlush()
{
if (!chunk)
{
@ -224,29 +153,10 @@ IProcessor::Status PlanSquashingTransform::prepareSend()
{
if (output.canPush())
{
planning_status = PlanningStatus::PUSH;
free_output = &output;
return Status::Ready;
}
}
return Status::PortFull;
}
if (planning_status == PlanningStatus::PUSH)
planning_status = PlanningStatus::READ_IF_CAN;
IProcessor::Status PlanSquashingTransform::prepareSendFlush()
{
if (!chunk)
{
planning_status = PlanningStatus::FINISH;
return Status::Ready;
}
for (auto &output : outputs)
{
if (output.canPush())
{
planning_status = PlanningStatus::PUSH;
free_output = &output;
output.push(std::move(chunk));
return Status::Ready;
}
}

View File

@ -10,9 +10,8 @@ enum PlanningStatus
INIT,
READ_IF_CAN,
WAIT_IN,
WAIT_OUT,
PUSH,
WAIT_OUT_FLUSH,
FLUSH,
FINISH
};
@ -32,29 +31,19 @@ public:
Status prepare() override;
void work() override;
Status init();
void init();
Status prepareConsume();
Status prepareSend();
Status push();
Status prepareSendFlush();
Status sendOrFlush();
Status waitForDataIn();
Status finish();
bool checkInputs();
bool checkOutputs();
void transform(Chunk & chunk);
protected:
void flushChunk();
private:
Chunk chunk;
PlanSquashing balance;
PlanningStatus planning_status = PlanningStatus::INIT;
size_t available_inputs = 0;
OutputPort* free_output = nullptr;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;
};
}