Remove some code.

This commit is contained in:
Nikolai Kochetov 2020-04-14 12:05:29 +03:00
parent 848026ac4c
commit 71feedea6f
10 changed files with 48 additions and 296 deletions

View File

@ -14,13 +14,13 @@ class ColumnAggregateFunction;
* corresponding to a one value of the primary key. For columns that are not part of the primary key and which do not have the AggregateFunction type,
* when merged, the first value is selected.
*/
class AggregatingSortedTransform : public IMergingTransform2<AggregatingSortedAlgorithm>
class AggregatingSortedTransform : public IMergingTransform<AggregatingSortedAlgorithm>
{
public:
AggregatingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,

View File

@ -18,7 +18,7 @@ namespace DB
* If negative by 1 is greater than positive rows, then only the first negative row is written.
* Otherwise, a logical error.
*/
class CollapsingSortedTransform final : public IMergingTransform2<CollapsingSortedAlgorithm>
class CollapsingSortedTransform final : public IMergingTransform<CollapsingSortedAlgorithm>
{
public:
CollapsingSortedTransform(
@ -29,7 +29,7 @@ public:
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,

View File

@ -16,14 +16,14 @@ namespace DB
* merge `value` values using the specified aggregate functions,
* as well as keeping the maximum value of the `version` column.
*/
class GraphiteRollupSortedTransform : public IMergingTransform2<GraphiteRollupSortedAlgorithm>
class GraphiteRollupSortedTransform : public IMergingTransform<GraphiteRollupSortedAlgorithm>
{
public:
GraphiteRollupSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,

View File

@ -1,5 +1,4 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/MergedData.h>
namespace DB
{
@ -10,196 +9,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
IMergingTransform::IMergingTransform(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
{
}
void IMergingTransform::onNewInput()
{
throw Exception("onNewInput is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void IMergingTransform::addInput()
{
if (have_all_inputs)
throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
inputs.emplace_back(outputs.front().getHeader(), this);
onNewInput();
}
void IMergingTransform::setHaveAllInputs()
{
if (have_all_inputs)
throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
have_all_inputs = true;
}
void IMergingTransform::requestDataForInput(size_t input_number)
{
if (need_data)
throw Exception("Data was requested for several inputs in IMergingTransform:"
" " + std::to_string(next_input_to_read) + " and " + std::to_string(input_number),
ErrorCodes::LOGICAL_ERROR);
need_data = true;
next_input_to_read = input_number;
}
void IMergingTransform::prepareOutputChunk(MergedData & merged_data)
{
if (need_data)
return;
has_output_chunk = (is_finished && merged_data.mergedRows()) || merged_data.hasEnoughRows();
if (has_output_chunk)
output_chunk = merged_data.pull();
}
IProcessor::Status IMergingTransform::prepareInitializeInputs()
{
/// Add information about inputs.
if (input_states.empty())
{
input_states.reserve(inputs.size());
for (auto & input : inputs)
input_states.emplace_back(input);
}
/// Check for inputs we need.
bool all_inputs_has_data = true;
auto it = inputs.begin();
for (size_t i = 0; it != inputs.end(); ++i, ++it)
{
auto & input = *it;
if (input.isFinished())
continue;
if (input_states[i].is_initialized)
{
// input.setNotNeeded();
continue;
}
input.setNeeded();
if (!input.hasData())
{
all_inputs_has_data = false;
continue;
}
auto chunk = input.pull();
if (!chunk.hasRows())
{
if (!input.isFinished())
all_inputs_has_data = false;
continue;
}
consume(std::move(chunk), i);
input_states[i].is_initialized = true;
}
if (!all_inputs_has_data)
return Status::NeedData;
initializeInputs();
is_initialized = true;
return Status::Ready;
}
IProcessor::Status IMergingTransform::prepare()
{
if (!have_all_inputs)
return Status::NeedData;
auto & output = outputs.front();
/// Special case for no inputs.
if (inputs.empty())
{
output.finish();
onFinish();
return Status::Finished;
}
/// Check can output.
if (output.isFinished())
{
for (auto & in : inputs)
in.close();
onFinish();
return Status::Finished;
}
/// Do not disable inputs, so it will work in the same way as with AsynchronousBlockInputStream, like before.
bool is_port_full = !output.canPush();
/// Push if has data.
if (has_output_chunk && !is_port_full)
{
output.push(std::move(output_chunk));
has_output_chunk = false;
}
if (!is_initialized)
return prepareInitializeInputs();
if (is_finished)
{
if (is_port_full)
return Status::PortFull;
for (auto & input : inputs)
input.close();
outputs.front().finish();
onFinish();
return Status::Finished;
}
if (need_data)
{
auto & input = input_states[next_input_to_read].port;
if (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
auto chunk = input.pull();
if (!chunk.hasRows() && !input.isFinished())
return Status::NeedData;
consume(std::move(chunk), next_input_to_read);
}
need_data = false;
}
if (is_port_full)
return Status::PortFull;
return Status::Ready;
}
IMergingTransformBase::IMergingTransformBase(
size_t num_inputs,
const Block & input_header,
@ -241,7 +50,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
for (auto & input : inputs)
input_states.emplace_back(input);
init_chunks.resize(inputs.size());
state.init_chunks.resize(inputs.size());
}
/// Check for inputs we need.
@ -277,7 +86,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue;
}
init_chunks[i] = std::move(chunk);
state.init_chunks[i] = std::move(chunk);
input_states[i].is_initialized = true;
}
@ -318,13 +127,13 @@ IProcessor::Status IMergingTransformBase::prepare()
bool is_port_full = !output.canPush();
/// Push if has data.
if (output_chunk && !is_port_full)
output.push(std::move(output_chunk));
if (state.output_chunk && !is_port_full)
output.push(std::move(state.output_chunk));
if (!is_initialized)
return prepareInitializeInputs();
if (is_finished)
if (state.is_finished)
{
if (is_port_full)
@ -339,9 +148,9 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Finished;
}
if (need_data)
if (state.need_data)
{
auto & input = input_states[next_input_to_read].port;
auto & input = input_states[state.next_input_to_read].port;
if (!input.isFinished())
{
input.setNeeded();
@ -353,10 +162,10 @@ IProcessor::Status IMergingTransformBase::prepare()
if (!chunk.hasRows() && !input.isFinished())
return Status::NeedData;
input_chunk = std::move(chunk);
state.input_chunk = std::move(chunk);
}
need_data = false;
state.need_data = false;
}
if (is_port_full)

View File

@ -7,66 +7,8 @@
namespace DB
{
class MergedData;
/// Base class for merging transforms.
class IMergingTransform : public IProcessor
{
public:
IMergingTransform(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
//size_t max_block_size,
//bool use_average_block_size, /// For adaptive granularity. Return chunks with the same avg size as inputs.
bool have_all_inputs_);
/// Methods to add additional input port. It is possible to do only before the first call of `prepare`.
void addInput();
/// Need to be called after all inputs are added. (only if have_all_inputs was not specified).
void setHaveAllInputs();
Status prepare() override;
protected:
virtual void onNewInput(); /// Is called when new input is added. To initialize input's data.
virtual void initializeInputs() = 0; /// Is called after first chunk was read for every input.
virtual void consume(Chunk chunk, size_t input_number) = 0; /// Is called after chunk was consumed from input.
virtual void onFinish() {} /// Is called when all data is processed.
void requestDataForInput(size_t input_number); /// Call it to say that next chunk of data is required for input.
void prepareOutputChunk(MergedData & merged_data); /// Moves chunk from merged_data to output_chunk if needed.
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
Chunk output_chunk;
bool has_output_chunk = false;
bool is_finished = false;
private:
/// Processor state.
bool is_initialized = false;
bool need_data = false;
size_t next_input_to_read = 0;
std::atomic<bool> have_all_inputs;
struct InputState
{
explicit InputState(InputPort & port_) : port(port_) {}
InputPort & port;
bool is_initialized = false;
};
std::vector<InputState> input_states;
Status prepareInitializeInputs();
};
/// Base class for merging transforms.
/// Base class for IMergingTransform.
/// It is needed to extract all non-template methods in single translation unit.
class IMergingTransformBase : public IProcessor
{
public:
@ -88,13 +30,18 @@ protected:
virtual void onFinish() {} /// Is called when all data is processed.
/// Processor state.
Chunk output_chunk;
Chunk input_chunk;
bool is_finished = false;
bool need_data = false;
size_t next_input_to_read = 0;
struct State
{
Chunk output_chunk;
Chunk input_chunk;
bool is_finished = false;
bool need_data = false;
size_t next_input_to_read = 0;
Chunks init_chunks;
Chunks init_chunks;
};
State state;
private:
struct InputState
@ -112,12 +59,13 @@ private:
IProcessor::Status prepareInitializeInputs();
};
/// Implementation of MergingTransform using IMergingAlgorithm.
template <typename Algorithm>
class IMergingTransform2 : public IMergingTransformBase
class IMergingTransform : public IMergingTransformBase
{
public:
template <typename ... Args>
IMergingTransform2(
IMergingTransform(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
@ -130,25 +78,25 @@ public:
void work() override
{
if (!init_chunks.empty())
algorithm.initialize(std::move(init_chunks));
if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks));
if (input_chunk)
algorithm.consume(std::move(input_chunk), next_input_to_read);
if (state.input_chunk)
algorithm.consume(std::move(state.input_chunk), state.next_input_to_read);
IMergingAlgorithm::Status status = algorithm.merge();
if (status.chunk && status.chunk.hasRows())
output_chunk = std::move(status.chunk);
state.output_chunk = std::move(status.chunk);
if (status.required_source >= 0)
{
next_input_to_read = status.required_source;
need_data = true;
state.next_input_to_read = status.required_source;
state.need_data = true;
}
if (status.is_finished)
is_finished = true;
state.is_finished = true;
}
protected:
@ -158,12 +106,7 @@ protected:
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
private:
using IMergingTransformBase::output_chunk;
using IMergingTransformBase::input_chunk;
using IMergingTransformBase::is_finished;
using IMergingTransformBase::need_data;
using IMergingTransformBase::next_input_to_read;
using IMergingTransformBase::init_chunks;
using IMergingTransformBase::state;
};
}

View File

@ -18,7 +18,7 @@ MergingSortedTransform::MergingSortedTransform(
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, have_all_inputs_,
header,
num_inputs,

View File

@ -8,7 +8,7 @@ namespace DB
{
/// Merges several sorted inputs into one sorted output.
class MergingSortedTransform final : public IMergingTransform2<MergingSortedAlgorithm>
class MergingSortedTransform final : public IMergingTransform<MergingSortedAlgorithm>
{
public:
MergingSortedTransform(

View File

@ -11,7 +11,7 @@ namespace DB
* For each group of consecutive identical values of the primary key (the columns by which the data is sorted),
* keeps row with max `version` value.
*/
class ReplacingSortedTransform final : public IMergingTransform2<ReplacingSortedAlgorithm>
class ReplacingSortedTransform final : public IMergingTransform<ReplacingSortedAlgorithm>
{
public:
ReplacingSortedTransform(
@ -20,7 +20,7 @@ public:
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,

View File

@ -11,7 +11,7 @@ namespace DB
* collapses them into one row, summing all the numeric columns except the primary key.
* If in all numeric columns, except for the primary key, the result is zero, it deletes the row.
*/
class SummingSortedTransform final : public IMergingTransform2<SummingSortedAlgorithm>
class SummingSortedTransform final : public IMergingTransform<SummingSortedAlgorithm>
{
public:
@ -21,7 +21,7 @@ public:
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,

View File

@ -12,7 +12,7 @@ namespace DB
* (the columns by which the data is sorted, including specially specified version column),
* merges any pair of consecutive rows with opposite sign.
*/
class VersionedCollapsingTransform final : public IMergingTransform2<VersionedCollapsingAlgorithm>
class VersionedCollapsingTransform final : public IMergingTransform<VersionedCollapsingAlgorithm>
{
public:
/// Don't need version column. It's in primary key.
@ -22,7 +22,7 @@ public:
size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform2(
: IMergingTransform(
num_inputs, header, header, true,
header,
num_inputs,