Fix build.

This commit is contained in:
Nikolai Kochetov 2020-04-09 20:25:44 +03:00
parent fc605d411d
commit f7fdfe4ed2
4 changed files with 90 additions and 72 deletions

View File

@ -24,10 +24,12 @@ public:
virtual void consume(Chunk chunk, size_t source_num) = 0;
virtual Status merge() = 0;
IMergingAlgorithm(IMergingAlgorithm &&) = default;
virtual ~IMergingAlgorithm() = default;
};
template <class T>
concept MergingAlgorithm = std::is_base_of<IMergingAlgorithm, T>::value && std::is_move_constructible<T>::value;
// TODO: use when compile with clang which could support it
// template <class T>
// concept MergingAlgorithm = std::is_base_of<IMergingAlgorithm, T>::value && std::is_move_constructible<T>::value;
}

View File

@ -200,28 +200,22 @@ IProcessor::Status IMergingTransform::prepare()
return Status::Ready;
}
template <MergingAlgorithm Algorithm>
IMergingTransform2<Algorithm>::IMergingTransform2(
Algorithm algorithm,
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, algorithm(std::move(algorithm))
, have_all_inputs(have_all_inputs_)
IMergingTransformBase::IMergingTransformBase(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
: IProcessor(InputPorts(num_inputs, input_header), {output_header})
, have_all_inputs(have_all_inputs_)
{
}
template <MergingAlgorithm Algorithm>
void IMergingTransform2<Algorithm>::onNewInput()
void IMergingTransformBase::onNewInput()
{
throw Exception("onNewInput is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
template <MergingAlgorithm Algorithm>
void IMergingTransform2<Algorithm>::addInput()
void IMergingTransformBase::addInput()
{
if (have_all_inputs)
throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
@ -230,8 +224,7 @@ void IMergingTransform2<Algorithm>::addInput()
onNewInput();
}
template <MergingAlgorithm Algorithm>
void IMergingTransform2<Algorithm>::setHaveAllInputs()
void IMergingTransformBase::setHaveAllInputs()
{
if (have_all_inputs)
throw Exception("IMergingTransform already have all inputs.", ErrorCodes::LOGICAL_ERROR);
@ -239,8 +232,7 @@ void IMergingTransform2<Algorithm>::setHaveAllInputs()
have_all_inputs = true;
}
template <MergingAlgorithm Algorithm>
IProcessor::Status IMergingTransform2<Algorithm>::prepareInitializeInputs()
IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
{
/// Add information about inputs.
if (input_states.empty())
@ -296,8 +288,7 @@ IProcessor::Status IMergingTransform2<Algorithm>::prepareInitializeInputs()
return Status::Ready;
}
template <MergingAlgorithm Algorithm>
IProcessor::Status IMergingTransform2<Algorithm>::prepare()
IProcessor::Status IMergingTransformBase::prepare()
{
if (!have_all_inputs)
return Status::NeedData;
@ -327,11 +318,8 @@ IProcessor::Status IMergingTransform2<Algorithm>::prepare()
bool is_port_full = !output.canPush();
/// Push if has data.
if (has_output_chunk && !is_port_full)
{
if (output_chunk && !is_port_full)
output.push(std::move(output_chunk));
has_output_chunk = false;
}
if (!is_initialized)
return prepareInitializeInputs();
@ -365,7 +353,7 @@ IProcessor::Status IMergingTransform2<Algorithm>::prepare()
if (!chunk.hasRows() && !input.isFinished())
return Status::NeedData;
algorithm.consume(std::move(chunk), next_input_to_read);
input_chunk = std::move(chunk);
}
need_data = false;
@ -377,25 +365,4 @@ IProcessor::Status IMergingTransform2<Algorithm>::prepare()
return Status::Ready;
}
template <MergingAlgorithm Algorithm>
void IMergingTransform2<Algorithm>::work()
{
if (!init_chunks.empty())
algorithm.initialize(std::move(init_chunks));
IMergingAlgorithm::Status status = algorithm.merge();
if (status.chunk && status.chunk.hasRows())
{
has_output_chunk = true;
output_chunk = std::move(status.chunk);
}
if (status.required_source >= 0)
next_input_to_read = status.required_source;
if (status.is_finished)
is_finished = true;
}
}

View File

@ -67,18 +67,14 @@ private:
};
/// Base class for merging transforms.
template <MergingAlgorithm Algorithm>
class IMergingTransform2 : public IProcessor
class IMergingTransformBase : public IProcessor
{
public:
IMergingTransform2(
Algorithm algorithm,
size_t num_inputs,
const Block & input_header,
const Block & output_header,
//size_t max_block_size,
//bool use_average_block_size, /// For adaptive granularity. Return chunks with the same avg size as inputs.
bool have_all_inputs_);
IMergingTransformBase(
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_);
/// Methods to add additional input port. It is possible to do only before the first call of `prepare`.
void addInput();
@ -86,27 +82,21 @@ public:
void setHaveAllInputs();
Status prepare() override;
void work() override;
protected:
virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false.
virtual void onFinish() {} /// Is called when all data is processed.
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
Algorithm algorithm;
private:
/// Processor state.
Chunk output_chunk;
bool has_output_chunk = false;
Chunk input_chunk;
bool is_finished = false;
bool is_initialized = false;
bool need_data = false;
size_t next_input_to_read = 0;
std::atomic<bool> have_all_inputs;
Chunks init_chunks;
private:
struct InputState
{
explicit InputState(InputPort & port_) : port(port_) {}
@ -116,9 +106,63 @@ private:
};
std::vector<InputState> input_states;
Chunks init_chunks;
std::atomic<bool> have_all_inputs;
bool is_initialized = false;
Status prepareInitializeInputs();
IProcessor::Status prepareInitializeInputs();
};
template <typename Algorithm>
class IMergingTransform2 : public IMergingTransformBase
{
public:
IMergingTransform2(
Algorithm algorithm_,
size_t num_inputs,
const Block & input_header,
const Block & output_header,
bool have_all_inputs_)
: IMergingTransformBase(num_inputs, input_header, output_header, have_all_inputs_)
, algorithm(std::move(algorithm_))
{
}
void work() override
{
if (!init_chunks.empty())
algorithm.initialize(std::move(init_chunks));
if (input_chunk)
algorithm.consume(std::move(input_chunk), next_input_to_read);
IMergingAlgorithm::Status status = algorithm.merge();
if (status.chunk && status.chunk.hasRows())
output_chunk = std::move(status.chunk);
if (status.required_source >= 0)
{
next_input_to_read = status.required_source;
need_data = true;
}
if (status.is_finished)
is_finished = true;
}
protected:
Algorithm algorithm;
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
private:
using IMergingTransformBase::output_chunk;
using IMergingTransformBase::input_chunk;
using IMergingTransformBase::is_finished;
using IMergingTransformBase::need_data;
using IMergingTransformBase::next_input_to_read;
using IMergingTransformBase::init_chunks;
};
}

View File

@ -80,6 +80,11 @@ void MergingSortedAlgorithm::consume(Chunk chunk, size_t source_num)
prepareChunk(chunk);
source_chunks[source_num] = std::move(chunk);
cursors[source_num].reset(source_chunks[source_num].getColumns(), {});
if (has_collation)
queue_with_collation.push(cursors[source_num]);
else
queue_without_collation.push(cursors[source_num]);
}
IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
@ -166,9 +171,9 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::insertFromChunk(size_t source_
auto num_rows = source_chunks[source_num].getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
bool is_finished = limit && total_merged_rows_after_insertion > limit;
bool is_finished = limit && total_merged_rows_after_insertion >= limit;
if (is_finished)
if (limit && total_merged_rows_after_insertion > limit)
{
num_rows = total_merged_rows_after_insertion - limit;
merged_data.insertFromChunk(std::move(source_chunks[source_num]), num_rows);