Fix pipeline stuck for parallel final.

This commit is contained in:
Nikolai Kochetov 2020-06-15 14:02:47 +03:00
parent 36d5a3d8a8
commit ccf2ceb876
17 changed files with 198 additions and 124 deletions

View File

@ -280,19 +280,19 @@ AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
{ {
} }
void AggregatingSortedAlgorithm::initialize(Chunks chunks) void AggregatingSortedAlgorithm::initialize(Inputs inputs)
{ {
for (auto & chunk : chunks) for (auto & input : inputs)
if (chunk) if (input.chunk)
preprocessChunk(chunk, columns_definition); preprocessChunk(input.chunk, columns_definition);
initializeQueue(std::move(chunks)); initializeQueue(std::move(inputs));
} }
void AggregatingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) void AggregatingSortedAlgorithm::consume(Input & input, size_t source_num)
{ {
preprocessChunk(chunk, columns_definition); preprocessChunk(input.chunk, columns_definition);
updateCursor(chunk, source_num); updateCursor(input, source_num);
} }
IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
@ -303,6 +303,13 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
bool key_differs; bool key_differs;
SortCursor current = queue.current(); SortCursor current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
{ {
detail::RowRef current_key; detail::RowRef current_key;
current_key.set(current); current_key.set(current);

View File

@ -19,8 +19,8 @@ public:
const Block & header, size_t num_inputs, const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size); SortDescription description_, size_t max_block_size);
void initialize(Chunks chunks) override; void initialize(Inputs inputs) override;
void consume(Chunk & chunk, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
struct SimpleAggregateDescription; struct SimpleAggregateDescription;

View File

@ -114,6 +114,14 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
while (queue.isValid()) while (queue.isValid())
{ {
auto current = queue.current(); auto current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
RowRef current_row; RowRef current_row;

View File

@ -157,6 +157,13 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
{ {
SortCursor current = queue.current(); SortCursor current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos); StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
bool new_path = is_first || next_path != current_group_path; bool new_path = is_first || next_path != current_group_path;
@ -224,7 +231,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num], *(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
/* nan_direction_hint = */ 1) >= 0) /* nan_direction_hint = */ 1) >= 0)
{ {
current_subgroup_newest_row.set(current, source_chunks[current.impl->order]); current_subgroup_newest_row.set(current, sources[current.impl->order].chunk);
/// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup /// Small hack: group and subgroups have the same path, so we can set current_group_path here instead of startNextGroup
/// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't /// But since we keep in memory current_subgroup_newest_row's block, we could use StringRef for current_group_path and don't

View File

@ -20,8 +20,32 @@ public:
explicit Status(size_t source) : required_source(source) {} explicit Status(size_t source) : required_source(source) {}
}; };
virtual void initialize(Chunks chunks) = 0; struct Input
virtual void consume(Chunk & chunk, size_t source_num) = 0; {
Chunk chunk;
/// It is a flag which says that last row from chunk should be ignored in result.
/// This row is not ignored in sorting and is needed to synchronize required source
/// between different algorithm objects in parallel FINAL.
bool skip_last_row = false;
void swap(Input & other)
{
chunk.swap(other.chunk);
std::swap(skip_last_row, other.skip_last_row);
}
void set(Chunk chunk_)
{
chunk = std::move(chunk_);
skip_last_row = false;
}
};
using Inputs = std::vector<Input>;
virtual void initialize(Inputs inputs) = 0;
virtual void consume(Input & input, size_t source_num) = 0;
virtual Status merge() = 0; virtual Status merge() = 0;
IMergingAlgorithm() = default; IMergingAlgorithm() = default;

View File

@ -8,36 +8,43 @@ IMergingAlgorithmWithDelayedChunk::IMergingAlgorithmWithDelayedChunk(
size_t num_inputs, size_t num_inputs,
SortDescription description_) SortDescription description_)
: description(std::move(description_)) : description(std::move(description_))
, source_chunks(num_inputs) , current_inputs(num_inputs)
, cursors(num_inputs) , cursors(num_inputs)
{ {
} }
void IMergingAlgorithmWithDelayedChunk::initializeQueue(Chunks chunks) void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
{ {
source_chunks = std::move(chunks); current_inputs = std::move(inputs);
for (size_t source_num = 0; source_num < source_chunks.size(); ++source_num) for (size_t source_num = 0; source_num < current_inputs.size(); ++source_num)
{ {
if (!source_chunks[source_num]) if (!current_inputs[source_num].chunk)
continue; continue;
cursors[source_num] = SortCursorImpl(source_chunks[source_num].getColumns(), description, source_num); cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num);
} }
queue = SortingHeap<SortCursor>(cursors); queue = SortingHeap<SortCursor>(cursors);
} }
void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk & chunk, size_t source_num) void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t source_num)
{ {
auto & source_chunk = source_chunks[source_num]; auto & current_input = current_inputs[source_num];
/// Extend lifetime of last chunk. /// Extend lifetime of last chunk.
last_chunk.swap(source_chunk); if (current_input.skip_last_row && current_input.chunk.getNumRows() <= 1)
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); {
/// But if chunk has only single skipped row, ignore it.
}
else
{
last_chunk.swap(current_input.chunk);
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
}
source_chunk.swap(chunk); current_input.swap(input);
cursors[source_num].reset(source_chunk.getColumns(), {}); cursors[source_num].reset(current_input.chunk.getColumns(), {});
queue.push(cursors[source_num]); queue.push(cursors[source_num]);
} }

View File

@ -23,12 +23,13 @@ protected:
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
void initializeQueue(Chunks chunks); void initializeQueue(Inputs inputs);
void updateCursor(Chunk & chunk, size_t source_num); void updateCursor(Input & input, size_t source_num);
bool skipLastRowFor(size_t input_number) const { return current_inputs[input_number].skip_last_row; }
private: private:
/// Chunks currently being merged. /// Inputs currently being merged.
std::vector<Chunk> source_chunks; Inputs current_inputs;
SortCursorImpls cursors; SortCursorImpls cursors;
/// In merging algorithm, we need to compare current sort key with the last one. /// In merging algorithm, we need to compare current sort key with the last one.

View File

@ -11,7 +11,7 @@ IMergingAlgorithmWithSharedChunks::IMergingAlgorithmWithSharedChunks(
: description(std::move(description_)) : description(std::move(description_))
, chunk_allocator(num_inputs + max_row_refs) , chunk_allocator(num_inputs + max_row_refs)
, cursors(num_inputs) , cursors(num_inputs)
, source_chunks(num_inputs) , sources(num_inputs)
, out_row_sources_buf(out_row_sources_buf_) , out_row_sources_buf(out_row_sources_buf_)
{ {
} }
@ -26,39 +26,39 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows); chunk.setColumns(std::move(columns), num_rows);
} }
void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks) void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
{ {
source_chunks.resize(chunks.size()); for (size_t source_num = 0; source_num < inputs.size(); ++source_num)
for (size_t source_num = 0; source_num < source_chunks.size(); ++source_num)
{ {
if (!chunks[source_num]) if (!inputs[source_num].chunk)
continue; continue;
prepareChunk(chunks[source_num]); prepareChunk(inputs[source_num].chunk);
auto & source_chunk = source_chunks[source_num]; auto & source = sources[source_num];
source_chunk = chunk_allocator.alloc(chunks[source_num]); source.skip_last_row = inputs[source_num].skip_last_row;
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num); source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num);
source_chunk->all_columns = cursors[source_num].all_columns; source.chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns; source.chunk->sort_columns = cursors[source_num].sort_columns;
} }
queue = SortingHeap<SortCursor>(cursors); queue = SortingHeap<SortCursor>(cursors);
} }
void IMergingAlgorithmWithSharedChunks::consume(Chunk & chunk, size_t source_num) void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)
{ {
prepareChunk(chunk); prepareChunk(input.chunk);
auto & source_chunk = source_chunks[source_num]; auto & source = sources[source_num];
source_chunk = chunk_allocator.alloc(chunk); source.skip_last_row = input.skip_last_row;
cursors[source_num].reset(source_chunk->getColumns(), {}); source.chunk = chunk_allocator.alloc(input.chunk);
cursors[source_num].reset(source.chunk->getColumns(), {});
source_chunk->all_columns = cursors[source_num].all_columns; source.chunk->all_columns = cursors[source_num].all_columns;
source_chunk->sort_columns = cursors[source_num].sort_columns; source.chunk->sort_columns = cursors[source_num].sort_columns;
queue.push(cursors[source_num]); queue.push(cursors[source_num]);
} }

View File

@ -15,8 +15,8 @@ public:
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
size_t max_row_refs); size_t max_row_refs);
void initialize(Chunks chunks) override; void initialize(Inputs inputs) override;
void consume(Chunk & chunk, size_t source_num) override; void consume(Input & input, size_t source_num) override;
private: private:
SortDescription description; SortDescription description;
@ -27,9 +27,16 @@ private:
SortCursorImpls cursors; SortCursorImpls cursors;
protected: protected:
/// Chunks currently being merged.
using SourceChunks = std::vector<detail::SharedChunkPtr>; struct Source
SourceChunks source_chunks; {
detail::SharedChunkPtr chunk;
bool skip_last_row;
};
/// Sources currently being merged.
using Sources = std::vector<Source>;
Sources sources;
SortingHeap<SortCursor> queue; SortingHeap<SortCursor> queue;
@ -38,7 +45,8 @@ protected:
WriteBuffer * out_row_sources_buf = nullptr; WriteBuffer * out_row_sources_buf = nullptr;
using RowRef = detail::RowRefWithOwnedChunk; using RowRef = detail::RowRefWithOwnedChunk;
void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, source_chunks[cursor.impl->order]); } void setRowRef(RowRef & row, SortCursor & cursor) { row.set(cursor, sources[cursor.impl->order].chunk); }
bool skipLastRowFor(size_t input_number) const { return sources[input_number].skip_last_row; }
}; };
} }

View File

@ -22,7 +22,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
, description(std::move(description_)) , description(std::move(description_))
, limit(limit_) , limit(limit_)
, out_row_sources_buf(out_row_sources_buf_) , out_row_sources_buf(out_row_sources_buf_)
, source_chunks(num_inputs) , current_inputs(num_inputs)
, cursors(num_inputs) , cursors(num_inputs)
{ {
/// Replace column names in description to positions. /// Replace column names in description to positions.
@ -39,7 +39,7 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
void MergingSortedAlgorithm::addInput() void MergingSortedAlgorithm::addInput()
{ {
source_chunks.emplace_back(); current_inputs.emplace_back();
cursors.emplace_back(); cursors.emplace_back();
} }
@ -53,13 +53,13 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows); chunk.setColumns(std::move(columns), num_rows);
} }
void MergingSortedAlgorithm::initialize(Chunks chunks) void MergingSortedAlgorithm::initialize(Inputs inputs)
{ {
source_chunks = std::move(chunks); current_inputs = std::move(inputs);
for (size_t source_num = 0; source_num < source_chunks.size(); ++source_num) for (size_t source_num = 0; source_num < current_inputs.size(); ++source_num)
{ {
auto & chunk = source_chunks[source_num]; auto & chunk = current_inputs[source_num].chunk;
if (!chunk) if (!chunk)
continue; continue;
@ -74,11 +74,11 @@ void MergingSortedAlgorithm::initialize(Chunks chunks)
queue_without_collation = SortingHeap<SortCursor>(cursors); queue_without_collation = SortingHeap<SortCursor>(cursors);
} }
void MergingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
{ {
prepareChunk(chunk); prepareChunk(input.chunk);
source_chunks[source_num].swap(chunk); current_inputs[source_num].swap(input);
cursors[source_num].reset(source_chunks[source_num].getColumns(), {}); cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), {});
if (has_collation) if (has_collation)
queue_with_collation.push(cursors[source_num]); queue_with_collation.push(cursors[source_num]);
@ -105,10 +105,18 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
auto current = queue.current(); auto current = queue.current();
if (current->isLast() && current_inputs[current->pos].skip_last_row)
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
/** And what if the block is totally less or equal than the rest for the current cursor? /** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor. * Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/ */
if (current.impl->isFirst() if (current.impl->isFirst()
&& !current_inputs[current->pos].skip_last_row /// Ignore optimization if last row should be skipped.
&& (queue.size() == 1 && (queue.size() == 1
|| (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild())))) || (queue.size() >= 2 && current.totallyLessOrEquals(queue.nextChild()))))
{ {
@ -167,7 +175,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::insertFromChunk(size_t source_
//std::cerr << "copied columns\n"; //std::cerr << "copied columns\n";
auto num_rows = source_chunks[source_num].getNumRows(); auto num_rows = current_inputs[source_num].chunk.getNumRows();
UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows; UInt64 total_merged_rows_after_insertion = merged_data.mergedRows() + num_rows;
bool is_finished = limit && total_merged_rows_after_insertion >= limit; bool is_finished = limit && total_merged_rows_after_insertion >= limit;
@ -175,12 +183,12 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::insertFromChunk(size_t source_
if (limit && total_merged_rows_after_insertion > limit) if (limit && total_merged_rows_after_insertion > limit)
{ {
num_rows -= total_merged_rows_after_insertion - limit; num_rows -= total_merged_rows_after_insertion - limit;
merged_data.insertFromChunk(std::move(source_chunks[source_num]), num_rows); merged_data.insertFromChunk(std::move(current_inputs[source_num].chunk), num_rows);
} }
else else
merged_data.insertFromChunk(std::move(source_chunks[source_num]), 0); merged_data.insertFromChunk(std::move(current_inputs[source_num].chunk), 0);
source_chunks[source_num] = Chunk(); current_inputs[source_num].chunk = Chunk();
/// Write order of rows for other columns /// Write order of rows for other columns
/// this data will be used in gather stream /// this data will be used in gather stream

View File

@ -22,8 +22,8 @@ public:
void addInput(); void addInput();
void initialize(Chunks chunks) override; void initialize(Inputs inputs) override;
void consume(Chunk & chunk, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
const MergedData & getMergedData() const { return merged_data; } const MergedData & getMergedData() const { return merged_data; }
@ -41,7 +41,7 @@ private:
WriteBuffer * out_row_sources_buf = nullptr; WriteBuffer * out_row_sources_buf = nullptr;
/// Chunks currently being merged. /// Chunks currently being merged.
std::vector<Chunk> source_chunks; Inputs current_inputs;
SortCursorImpls cursors; SortCursorImpls cursors;

View File

@ -40,6 +40,13 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
{ {
SortCursor current = queue.current(); SortCursor current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
RowRef current_row; RowRef current_row;
setRowRef(current_row, current); setRowRef(current_row, current);

View File

@ -623,19 +623,19 @@ SummingSortedAlgorithm::SummingSortedAlgorithm(
{ {
} }
void SummingSortedAlgorithm::initialize(Chunks chunks) void SummingSortedAlgorithm::initialize(Inputs inputs)
{ {
for (auto & chunk : chunks) for (auto & input : inputs)
if (chunk) if (input.chunk)
preprocessChunk(chunk); preprocessChunk(input.chunk);
initializeQueue(std::move(chunks)); initializeQueue(std::move(inputs));
} }
void SummingSortedAlgorithm::consume(Chunk & chunk, size_t source_num) void SummingSortedAlgorithm::consume(Input & input, size_t source_num)
{ {
preprocessChunk(chunk); preprocessChunk(input.chunk);
updateCursor(chunk, source_num); updateCursor(input, source_num);
} }
IMergingAlgorithm::Status SummingSortedAlgorithm::merge() IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
@ -647,6 +647,13 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
SortCursor current = queue.current(); SortCursor current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
{ {
detail::RowRef current_key; detail::RowRef current_key;
current_key.set(current); current_key.set(current);

View File

@ -22,8 +22,8 @@ public:
const Names & column_names_to_sum, const Names & column_names_to_sum,
size_t max_block_size); size_t max_block_size);
void initialize(Chunks chunks) override; void initialize(Inputs inputs) override;
void consume(Chunk & chunk, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
struct AggregateDescription; struct AggregateDescription;

View File

@ -64,6 +64,13 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
{ {
SortCursor current = queue.current(); SortCursor current = queue.current();
if (current->isLast() && skipLastRowFor(current->pos))
{
/// Get the next block from the corresponding source, if there is one.
queue.removeTop();
return Status(current.impl->order);
}
RowRef current_row; RowRef current_row;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos]; Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];

View File

@ -87,7 +87,7 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
continue; continue;
} }
state.init_chunks[i] = std::move(chunk); state.init_chunks[i].set(std::move(chunk));
input_states[i].is_initialized = true; input_states[i].is_initialized = true;
} }
@ -158,8 +158,8 @@ IProcessor::Status IMergingTransformBase::prepare()
if (!input.hasData()) if (!input.hasData())
return Status::NeedData; return Status::NeedData;
state.input_chunk = input.pull(); state.input_chunk.set(input.pull());
if (!state.input_chunk.hasRows() && !input.isFinished()) if (!state.input_chunk.chunk.hasRows() && !input.isFinished())
return Status::NeedData; return Status::NeedData;
state.has_input = true; state.has_input = true;
@ -174,12 +174,12 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Ready; return Status::Ready;
} }
static void filterChunk(Chunk & chunk, size_t selector_position) static void filterChunk(IMergingAlgorithm::Input & input, size_t selector_position)
{ {
if (!chunk.getChunkInfo()) if (!input.chunk.getChunkInfo())
throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR); throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto * chunk_info = typeid_cast<const SelectorInfo *>(chunk.getChunkInfo().get()); const auto * chunk_info = typeid_cast<const SelectorInfo *>(input.chunk.getChunkInfo().get());
if (!chunk_info) if (!chunk_info)
throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR); throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
@ -188,8 +188,8 @@ static void filterChunk(Chunk & chunk, size_t selector_position)
IColumn::Filter filter; IColumn::Filter filter;
filter.resize_fill(selector.size()); filter.resize_fill(selector.size());
size_t num_rows = chunk.getNumRows(); size_t num_rows = input.chunk.getNumRows();
auto columns = chunk.detachColumns(); auto columns = input.chunk.detachColumns();
size_t num_result_rows = 0; size_t num_result_rows = 0;
@ -202,54 +202,39 @@ static void filterChunk(Chunk & chunk, size_t selector_position)
} }
} }
if (!filter.empty() && filter.back() == 0)
{
filter.back() = 1;
++num_result_rows;
input.skip_last_row = true;
}
for (auto & column : columns) for (auto & column : columns)
column = column->filter(filter, num_result_rows); column = column->filter(filter, num_result_rows);
chunk.clear(); input.chunk.clear();
chunk.setColumns(std::move(columns), num_result_rows); input.chunk.setColumns(std::move(columns), num_result_rows);
} }
bool IMergingTransformBase::filterChunks() void IMergingTransformBase::filterChunks()
{ {
if (state.selector_position < 0) if (state.selector_position < 0)
return true; return;
bool has_empty_chunk = false;
if (!state.init_chunks.empty()) if (!state.init_chunks.empty())
{ {
for (size_t i = 0; i < input_states.size(); ++i) for (size_t i = 0; i < input_states.size(); ++i)
{ {
auto & chunk = state.init_chunks[i]; auto & input = state.init_chunks[i];
if (!chunk || input_states[i].is_filtered) if (!input.chunk)
continue; continue;
filterChunk(chunk, state.selector_position); filterChunk(input, state.selector_position);
if (!chunk.hasRows())
{
chunk.clear();
has_empty_chunk = true;
input_states[i].is_initialized = false;
is_initialized = false;
}
else
input_states[i].is_filtered = true;
} }
} }
if (state.has_input) if (state.has_input)
{
filterChunk(state.input_chunk, state.selector_position); filterChunk(state.input_chunk, state.selector_position);
if (!state.input_chunk.hasRows())
{
state.has_input = false;
state.need_data = true;
has_empty_chunk = true;
}
}
return !has_empty_chunk;
} }

View File

@ -36,19 +36,19 @@ protected:
virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false.
virtual void onFinish() {} /// Is called when all data is processed. virtual void onFinish() {} /// Is called when all data is processed.
bool filterChunks(); /// Filter chunks if selector position was set. For parallel final. void filterChunks(); /// Filter chunks if selector position was set. For parallel final.
/// Processor state. /// Processor state.
struct State struct State
{ {
Chunk output_chunk; Chunk output_chunk;
Chunk input_chunk; IMergingAlgorithm::Input input_chunk;
bool has_input = false; bool has_input = false;
bool is_finished = false; bool is_finished = false;
bool need_data = false; bool need_data = false;
size_t next_input_to_read = 0; size_t next_input_to_read = 0;
Chunks init_chunks; IMergingAlgorithm::Inputs init_chunks;
ssize_t selector_position = -1; ssize_t selector_position = -1;
}; };
@ -61,7 +61,6 @@ private:
InputPort & port; InputPort & port;
bool is_initialized = false; bool is_initialized = false;
bool is_filtered = false;
}; };
std::vector<InputState> input_states; std::vector<InputState> input_states;
@ -90,8 +89,7 @@ public:
void work() override void work() override
{ {
if (!filterChunks()) filterChunks();
return;
if (!state.init_chunks.empty()) if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks)); algorithm.initialize(std::move(state.init_chunks));