From df2d1ed6386e757514a1cf54f610e4cf072e0532 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 13 Sep 2023 21:09:49 +0000 Subject: [PATCH] fix reading of virtual columns in reverse order --- .../MergeTree/MergeTreeSelectAlgorithms.cpp | 54 ++++--------------- .../MergeTree/MergeTreeSelectAlgorithms.h | 33 ++++++++---- .../MergeTree/MergeTreeSelectProcessor.cpp | 11 ++-- ..._in_reverse_order_virtual_column.reference | 1 + ...3_read_in_reverse_order_virtual_column.sql | 10 ++++ 5 files changed, 49 insertions(+), 60 deletions(-) create mode 100644 tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.reference create mode 100644 tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.sql diff --git a/src/Storages/MergeTree/MergeTreeSelectAlgorithms.cpp b/src/Storages/MergeTree/MergeTreeSelectAlgorithms.cpp index 8bc4377cffb..bf97d269dc6 100644 --- a/src/Storages/MergeTree/MergeTreeSelectAlgorithms.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectAlgorithms.cpp @@ -1,5 +1,4 @@ #include -#include namespace DB { @@ -9,57 +8,29 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -MergeTreeThreadSelectAlgorithm::TaskResult MergeTreeThreadSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) -{ - TaskResult res; - res.first = pool.getTask(thread_idx, previous_task); - res.second = !!res.first; - return res; -} - -MergeTreeReadTask::BlockAndProgress MergeTreeThreadSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const MergeTreeReadTask::BlockSizeParams & params) -{ - if (!task) - return {}; - - return task->read(params); -} - -IMergeTreeSelectAlgorithm::TaskResult MergeTreeInOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) +MergeTreeReadTaskPtr MergeTreeInOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) { if (!pool.preservesOrderOfRanges()) throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeInOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName()); - TaskResult res; - res.first = pool.getTask(part_idx, previous_task); - res.second = !!res.first; - return res; + return pool.getTask(part_idx, previous_task); } -MergeTreeReadTask::BlockAndProgress MergeTreeInOrderSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) -{ - if (!task) - return {}; - - return task->read(params); -} - -IMergeTreeSelectAlgorithm::TaskResult MergeTreeInReverseOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) +MergeTreeReadTaskPtr MergeTreeInReverseOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) { if (!pool.preservesOrderOfRanges()) throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeInReverseOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName()); - TaskResult res; - res.first = pool.getTask(part_idx, previous_task); - /// We may have some chunks to return in buffer. - /// Set continue_reading to true but actually don't create a new task. - res.second = !!res.first || !chunks.empty(); - return res; + if (!chunks.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot get new task for reading in reverse order because there are {} buffered chunks", chunks.size()); + + return pool.getTask(part_idx, previous_task); } -MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) +MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) { MergeTreeReadTask::BlockAndProgress res; @@ -70,11 +41,8 @@ MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::read return res; } - if (!task) - return {}; - - while (!task->isFinished()) - chunks.push_back(task->read(params)); + while (!task.isFinished()) + chunks.push_back(task.read(params)); if (chunks.empty()) return {}; diff --git a/src/Storages/MergeTree/MergeTreeSelectAlgorithms.h b/src/Storages/MergeTree/MergeTreeSelectAlgorithms.h index a6254a90687..afc8032bb99 100644 --- a/src/Storages/MergeTree/MergeTreeSelectAlgorithms.h +++ b/src/Storages/MergeTree/MergeTreeSelectAlgorithms.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -11,15 +12,16 @@ class IMergeTreeReadPool; class IMergeTreeSelectAlgorithm : private boost::noncopyable { public: - /// The pair of {task, continue_reading}. - using TaskResult = std::pair; using BlockSizeParams = MergeTreeReadTask::BlockSizeParams; + using BlockAndProgress = MergeTreeReadTask::BlockAndProgress; virtual ~IMergeTreeSelectAlgorithm() = default; virtual String getName() const = 0; - virtual TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) = 0; - virtual MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) = 0; + virtual bool needNewTask(const MergeTreeReadTask & task) const = 0; + + virtual MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) = 0; + virtual BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) = 0; }; using MergeTreeSelectAlgorithmPtr = std::unique_ptr; @@ -28,9 +30,12 @@ class MergeTreeThreadSelectAlgorithm : public IMergeTreeSelectAlgorithm { public: explicit MergeTreeThreadSelectAlgorithm(size_t thread_idx_) : thread_idx(thread_idx_) {} + String getName() const override { return "Thread"; } - TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; - MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override; + bool needNewTask(const MergeTreeReadTask & task) const override { return task.isFinished(); } + + MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override { return pool.getTask(thread_idx, previous_task); } + BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override { return task.read(params); } private: const size_t thread_idx; @@ -40,9 +45,12 @@ class MergeTreeInOrderSelectAlgorithm : public IMergeTreeSelectAlgorithm { public: explicit MergeTreeInOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {} + String getName() const override { return "InOrder"; } - TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; - MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override; + bool needNewTask(const MergeTreeReadTask & task) const override { return task.isFinished(); } + + MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; + MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override { return task.read(params); } private: const size_t part_idx; @@ -52,13 +60,16 @@ class MergeTreeInReverseOrderSelectAlgorithm : public IMergeTreeSelectAlgorithm { public: explicit MergeTreeInReverseOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {} + String getName() const override { return "InReverseOrder"; } - TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; - MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override; + bool needNewTask(const MergeTreeReadTask & task) const override { return chunks.empty() && task.isFinished(); } + + MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; + BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override; private: const size_t part_idx; - std::vector chunks; + std::vector chunks; }; } diff --git a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index 975fad1ab6b..95fcde23f8e 100644 --- a/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -139,11 +139,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read() { try { - bool continue_reading = true; - if (!task || task->isFinished()) - std::tie(task, continue_reading) = algorithm->getNewTask(*pool, task.get()); + if (!task || algorithm->needNewTask(*task)) + task = algorithm->getNewTask(*pool, task.get()); - if (!continue_reading) + if (!task) break; } catch (const Exception & e) @@ -153,10 +152,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read() throw; } - if (task && !task->getMainRangeReader().isInitialized()) + if (!task->getMainRangeReader().isInitialized()) initializeRangeReaders(); - auto res = algorithm->readFromTask(task.get(), block_size_params); + auto res = algorithm->readFromTask(*task, block_size_params); if (res.row_count) { diff --git a/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.reference b/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.reference new file mode 100644 index 00000000000..f77195f1f31 --- /dev/null +++ b/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.reference @@ -0,0 +1 @@ +198401_1_1_0 diff --git a/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.sql b/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.sql new file mode 100644 index 00000000000..76821c8797d --- /dev/null +++ b/tests/queries/0_stateless/02883_read_in_reverse_order_virtual_column.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS t_reverse_order_virt_col; + +CREATE TABLE t_reverse_order_virt_col (`order_0` Decimal(76, 53), `p_time` Date) +ENGINE = MergeTree PARTITION BY toYYYYMM(p_time) +ORDER BY order_0; + +INSERT INTO t_reverse_order_virt_col SELECT number, '1984-01-01' FROM numbers(1000000); +SELECT DISTINCT _part FROM (SELECT _part FROM t_reverse_order_virt_col ORDER BY order_0 DESC); + +DROP TABLE IF EXISTS t_reverse_order_virt_col;