Replaced SortingHeap with SortingQueue

This commit is contained in:
Maksim Kita 2022-06-14 16:36:28 +02:00
parent 9670504781
commit 3664f02690
9 changed files with 22 additions and 25 deletions

View File

@ -552,16 +552,13 @@ private:
};
template <typename Cursor>
using SortingHeap = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
template <typename Cursor>
using SortingQueueDefault = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
using SortingQueue = SortingQueueImpl<Cursor, SortingQueueStrategy::Default>;
template <typename Cursor>
using SortingQueueBatch = SortingQueueImpl<Cursor, SortingQueueStrategy::Batch>;
/** SortQueueVariants allow to specialize sorting queue for concrete types and sort description.
* To access default queue variant callOnDefaultVariant method must be used.
* To access queue variant callOnVariant method must be used.
* To access batch queue variant callOnBatchVariant method must be used.
*/
class SortQueueVariants
@ -617,7 +614,7 @@ public:
}
template <typename Func>
decltype(auto) callOnDefaultVariant(Func && func)
decltype(auto) callOnVariant(Func && func)
{
return std::visit(func, default_queue_variants);
}
@ -630,16 +627,16 @@ public:
bool variantSupportJITCompilation() const
{
return std::holds_alternative<SortingHeap<SimpleSortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingHeap<SortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingHeap<SortCursorWithCollation>>(default_queue_variants);
return std::holds_alternative<SortingQueue<SimpleSortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingQueue<SortCursor>>(default_queue_variants)
|| std::holds_alternative<SortingQueue<SortCursorWithCollation>>(default_queue_variants);
}
private:
template <typename Cursor>
void initializeQueues()
{
default_queue_variants = SortingQueueDefault<Cursor>();
default_queue_variants = SortingQueue<Cursor>();
batch_queue_variants = SortingQueueBatch<Cursor>();
}

View File

@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
header, current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
}
queue = SortingHeap<SortCursor>(cursors);
queue = SortingQueue<SortCursor>(cursors);
}
void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t source_num)

View File

@ -13,7 +13,7 @@ public:
IMergingAlgorithmWithDelayedChunk(Block header_, size_t num_inputs, SortDescription description_);
protected:
SortingHeap<SortCursor> queue;
SortingQueue<SortCursor> queue;
SortDescription description;
/// Previous row. May refer to last_chunk_sort_columns or row from source_chunks.

View File

@ -43,7 +43,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
source.chunk->sort_columns = cursors[source_num].sort_columns;
}
queue = SortingHeap<SortCursor>(cursors);
queue = SortingQueue<SortCursor>(cursors);
}
void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num)

View File

@ -36,7 +36,7 @@ protected:
using Sources = std::vector<Source>;
Sources sources;
SortingHeap<SortCursor> queue;
SortingQueue<SortCursor> queue;
/// Used in Vertical merge algorithm to gather non-PK/non-index columns (on next step)
/// If it is not nullptr then it should be populated during execution

View File

@ -74,7 +74,7 @@ void MergingSortedAlgorithm::initialize(Inputs inputs)
cursors[source_num] = SortCursorImpl(header, chunk.getColumns(), description, source_num);
}
queue_variants.callOnDefaultVariant([&](auto & queue)
queue_variants.callOnVariant([&](auto & queue)
{
using QueueType = std::decay_t<decltype(queue)>;
queue = QueueType(cursors);
@ -87,7 +87,7 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
current_inputs[source_num].swap(input);
cursors[source_num].reset(current_inputs[source_num].chunk.getColumns(), header);
queue_variants.callOnDefaultVariant([&](auto & queue)
queue_variants.callOnVariant([&](auto & queue)
{
queue.push(cursors[source_num]);
});
@ -95,7 +95,7 @@ void MergingSortedAlgorithm::consume(Input & input, size_t source_num)
IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
{
IMergingAlgorithm::Status result = queue_variants.callOnDefaultVariant([&](auto & queue)
IMergingAlgorithm::Status result = queue_variants.callOnVariant([&](auto & queue)
{
return mergeImpl(queue);
});
@ -103,8 +103,8 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::merge()
return result;
}
template <typename TSortingHeap>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue)
template <typename TSortingQueue>
IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingQueue & queue)
{
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())

View File

@ -53,8 +53,8 @@ private:
Status insertFromChunk(size_t source_num);
template <typename TSortingHeap>
Status mergeImpl(TSortingHeap & queue);
template <typename TSortingQueue>
Status mergeImpl(TSortingQueue & queue);
};
}

View File

@ -73,8 +73,8 @@ Chunk MergeSorter::read()
}
template <typename TSortingHeap>
Chunk MergeSorter::mergeBatchImpl(TSortingHeap & queue)
template <typename TSortingQueue>
Chunk MergeSorter::mergeBatchImpl(TSortingQueue & queue)
{
size_t num_columns = chunks[0].getNumColumns();
MutableColumns merged_columns = chunks[0].cloneEmptyColumns();

View File

@ -34,8 +34,8 @@ private:
/** Two different cursors are supported - with and without Collation.
* Templates are used (instead of virtual functions in SortCursor) for zero-overhead.
*/
template <typename TSortingHeap>
Chunk mergeBatchImpl(TSortingHeap & queue);
template <typename TSortingQueue>
Chunk mergeBatchImpl(TSortingQueue & queue);
};