Merge pull request #54610 from CurtizJ/fix-after-refactoring

Fix reading of virtual columns in reverse order
This commit is contained in:
robot-clickhouse-ci-2 2023-09-14 03:38:27 +02:00 committed by GitHub
commit e111d4abd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 60 deletions

View File

@ -1,5 +1,4 @@
#include <Storages/MergeTree/MergeTreeSelectAlgorithms.h> #include <Storages/MergeTree/MergeTreeSelectAlgorithms.h>
#include <Storages/MergeTree/IMergeTreeReadPool.h>
namespace DB namespace DB
{ {
@ -9,57 +8,29 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
MergeTreeThreadSelectAlgorithm::TaskResult MergeTreeThreadSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) MergeTreeReadTaskPtr MergeTreeInOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task)
{
TaskResult res;
res.first = pool.getTask(thread_idx, previous_task);
res.second = !!res.first;
return res;
}
MergeTreeReadTask::BlockAndProgress MergeTreeThreadSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const MergeTreeReadTask::BlockSizeParams & params)
{
if (!task)
return {};
return task->read(params);
}
IMergeTreeSelectAlgorithm::TaskResult MergeTreeInOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task)
{ {
if (!pool.preservesOrderOfRanges()) if (!pool.preservesOrderOfRanges())
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"MergeTreeInOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName()); "MergeTreeInOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName());
TaskResult res; return pool.getTask(part_idx, previous_task);
res.first = pool.getTask(part_idx, previous_task);
res.second = !!res.first;
return res;
} }
MergeTreeReadTask::BlockAndProgress MergeTreeInOrderSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) MergeTreeReadTaskPtr MergeTreeInReverseOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task)
{
if (!task)
return {};
return task->read(params);
}
IMergeTreeSelectAlgorithm::TaskResult MergeTreeInReverseOrderSelectAlgorithm::getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task)
{ {
if (!pool.preservesOrderOfRanges()) if (!pool.preservesOrderOfRanges())
throw Exception(ErrorCodes::LOGICAL_ERROR, throw Exception(ErrorCodes::LOGICAL_ERROR,
"MergeTreeInReverseOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName()); "MergeTreeInReverseOrderSelectAlgorithm requires read pool that preserves order of ranges, got: {}", pool.getName());
TaskResult res; if (!chunks.empty())
res.first = pool.getTask(part_idx, previous_task); throw Exception(ErrorCodes::LOGICAL_ERROR,
/// We may have some chunks to return in buffer. "Cannot get new task for reading in reverse order because there are {} buffered chunks", chunks.size());
/// Set continue_reading to true but actually don't create a new task.
res.second = !!res.first || !chunks.empty(); return pool.getTask(part_idx, previous_task);
return res;
} }
MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params)
{ {
MergeTreeReadTask::BlockAndProgress res; MergeTreeReadTask::BlockAndProgress res;
@ -70,11 +41,8 @@ MergeTreeReadTask::BlockAndProgress MergeTreeInReverseOrderSelectAlgorithm::read
return res; return res;
} }
if (!task) while (!task.isFinished())
return {}; chunks.push_back(task.read(params));
while (!task->isFinished())
chunks.push_back(task->read(params));
if (chunks.empty()) if (chunks.empty())
return {}; return {};

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Storages/MergeTree/MergeTreeReadTask.h> #include <Storages/MergeTree/MergeTreeReadTask.h>
#include <Storages/MergeTree/IMergeTreeReadPool.h>
#include <boost/core/noncopyable.hpp> #include <boost/core/noncopyable.hpp>
namespace DB namespace DB
@ -11,15 +12,16 @@ class IMergeTreeReadPool;
class IMergeTreeSelectAlgorithm : private boost::noncopyable class IMergeTreeSelectAlgorithm : private boost::noncopyable
{ {
public: public:
/// The pair of {task, continue_reading}.
using TaskResult = std::pair<MergeTreeReadTaskPtr, bool>;
using BlockSizeParams = MergeTreeReadTask::BlockSizeParams; using BlockSizeParams = MergeTreeReadTask::BlockSizeParams;
using BlockAndProgress = MergeTreeReadTask::BlockAndProgress;
virtual ~IMergeTreeSelectAlgorithm() = default; virtual ~IMergeTreeSelectAlgorithm() = default;
virtual String getName() const = 0; virtual String getName() const = 0;
virtual TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) = 0; virtual bool needNewTask(const MergeTreeReadTask & task) const = 0;
virtual MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) = 0;
virtual MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) = 0;
virtual BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) = 0;
}; };
using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>; using MergeTreeSelectAlgorithmPtr = std::unique_ptr<IMergeTreeSelectAlgorithm>;
@ -28,9 +30,12 @@ class MergeTreeThreadSelectAlgorithm : public IMergeTreeSelectAlgorithm
{ {
public: public:
explicit MergeTreeThreadSelectAlgorithm(size_t thread_idx_) : thread_idx(thread_idx_) {} explicit MergeTreeThreadSelectAlgorithm(size_t thread_idx_) : thread_idx(thread_idx_) {}
String getName() const override { return "Thread"; } String getName() const override { return "Thread"; }
TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; bool needNewTask(const MergeTreeReadTask & task) const override { return task.isFinished(); }
MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override;
MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override { return pool.getTask(thread_idx, previous_task); }
BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override { return task.read(params); }
private: private:
const size_t thread_idx; const size_t thread_idx;
@ -40,9 +45,12 @@ class MergeTreeInOrderSelectAlgorithm : public IMergeTreeSelectAlgorithm
{ {
public: public:
explicit MergeTreeInOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {} explicit MergeTreeInOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {}
String getName() const override { return "InOrder"; } String getName() const override { return "InOrder"; }
TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; bool needNewTask(const MergeTreeReadTask & task) const override { return task.isFinished(); }
MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override;
MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override;
MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override { return task.read(params); }
private: private:
const size_t part_idx; const size_t part_idx;
@ -52,13 +60,16 @@ class MergeTreeInReverseOrderSelectAlgorithm : public IMergeTreeSelectAlgorithm
{ {
public: public:
explicit MergeTreeInReverseOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {} explicit MergeTreeInReverseOrderSelectAlgorithm(size_t part_idx_) : part_idx(part_idx_) {}
String getName() const override { return "InReverseOrder"; } String getName() const override { return "InReverseOrder"; }
TaskResult getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override; bool needNewTask(const MergeTreeReadTask & task) const override { return chunks.empty() && task.isFinished(); }
MergeTreeReadTask::BlockAndProgress readFromTask(MergeTreeReadTask * task, const BlockSizeParams & params) override;
MergeTreeReadTaskPtr getNewTask(IMergeTreeReadPool & pool, MergeTreeReadTask * previous_task) override;
BlockAndProgress readFromTask(MergeTreeReadTask & task, const BlockSizeParams & params) override;
private: private:
const size_t part_idx; const size_t part_idx;
std::vector<MergeTreeReadTask::BlockAndProgress> chunks; std::vector<BlockAndProgress> chunks;
}; };
} }

View File

@ -139,11 +139,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
{ {
try try
{ {
bool continue_reading = true; if (!task || algorithm->needNewTask(*task))
if (!task || task->isFinished()) task = algorithm->getNewTask(*pool, task.get());
std::tie(task, continue_reading) = algorithm->getNewTask(*pool, task.get());
if (!continue_reading) if (!task)
break; break;
} }
catch (const Exception & e) catch (const Exception & e)
@ -153,10 +152,10 @@ ChunkAndProgress MergeTreeSelectProcessor::read()
throw; throw;
} }
if (task && !task->getMainRangeReader().isInitialized()) if (!task->getMainRangeReader().isInitialized())
initializeRangeReaders(); initializeRangeReaders();
auto res = algorithm->readFromTask(task.get(), block_size_params); auto res = algorithm->readFromTask(*task, block_size_params);
if (res.row_count) if (res.row_count)
{ {

View File

@ -0,0 +1 @@
198401_1_1_0

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS t_reverse_order_virt_col;
CREATE TABLE t_reverse_order_virt_col (`order_0` Decimal(76, 53), `p_time` Date)
ENGINE = MergeTree PARTITION BY toYYYYMM(p_time)
ORDER BY order_0;
INSERT INTO t_reverse_order_virt_col SELECT number, '1984-01-01' FROM numbers(1000000);
SELECT DISTINCT _part FROM (SELECT _part FROM t_reverse_order_virt_col ORDER BY order_0 DESC);
DROP TABLE IF EXISTS t_reverse_order_virt_col;