Merge pull request #22352 from ClickHouse/add-read-from-mt-step

Special query plan step for read from MergeTree
This commit is contained in:
Nikolai Kochetov 2021-04-19 17:38:40 +03:00 committed by GitHub
commit c59628846c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1049 additions and 362 deletions

View File

@ -129,6 +129,7 @@ struct QueryPlanSettings
{"header", query_plan_options.header},
{"description", query_plan_options.description},
{"actions", query_plan_options.actions},
{"indexes", query_plan_options.indexes},
{"optimize", optimize},
};
};

View File

@ -99,6 +99,9 @@ public:
/// Get detailed description of step actions. This is shown in EXPLAIN query with options `actions = 1`.
virtual void describeActions(FormatSettings & /*settings*/) const {}
/// Get detailed description of read-from-storage step indexes (if any). Shown in with options `indexes = 1`.
virtual void describeIndexes(FormatSettings & /*settings*/) const {}
/// Get description of processors added in current step. Should be called after updatePipeline().
virtual void describePipeline(FormatSettings & /*settings*/) const {}

View File

@ -243,6 +243,9 @@ static void explainStep(
if (options.actions)
step.describeActions(settings);
if (options.indexes)
step.describeIndexes(settings);
}
std::string debugExplainStep(const IQueryPlanStep & step)

View File

@ -66,6 +66,8 @@ public:
bool description = true;
/// Add detailed information about step actions.
bool actions = false;
/// Add information about indexes actions.
bool indexes = false;
};
struct ExplainPipelineOptions

View File

@ -0,0 +1,249 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPipeline.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectBlockInputProcessor.h>
#include <common/logger_useful.h>
namespace DB
{
ReadFromMergeTree::ReadFromMergeTree(
const MergeTreeData & storage_,
StorageMetadataPtr metadata_snapshot_,
String query_id_,
Names required_columns_,
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
ReadType read_type_)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals(), storage_.getStorageID()),
prewhere_info_,
virt_column_names_)})
, storage(storage_)
, metadata_snapshot(std::move(metadata_snapshot_))
, query_id(std::move(query_id_))
, required_columns(std::move(required_columns_))
, parts(std::move(parts_))
, index_stats(std::move(index_stats_))
, prewhere_info(std::move(prewhere_info_))
, virt_column_names(std::move(virt_column_names_))
, settings(std::move(settings_))
, num_streams(num_streams_)
, read_type(read_type_)
{
}
Pipe ReadFromMergeTree::readFromPool()
{
Pipes pipes;
size_t sum_marks = 0;
size_t total_rows = 0;
for (const auto & part : parts)
{
sum_marks += part.getMarksCount();
total_rows += part.getRowsCount();
}
auto pool = std::make_shared<MergeTreeReadPool>(
num_streams,
sum_marks,
settings.min_marks_for_concurrent_read,
std::move(parts),
storage,
metadata_snapshot,
prewhere_info,
true,
required_columns,
settings.backoff_settings,
settings.preferred_block_size_bytes,
false);
auto * logger = &Poco::Logger::get(storage.getLogName() + " (SelectExecutor)");
LOG_DEBUG(logger, "Reading approx. {} rows with {} streams", total_rows, num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, settings.min_marks_for_concurrent_read, settings.max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
storage, metadata_snapshot, settings.use_uncompressed_cache,
prewhere_info, settings.reader_settings, virt_column_names);
if (i == 0)
{
/// Set the approximate number of rows for the first source only
source->addTotalRowsApprox(total_rows);
}
pipes.emplace_back(std::move(source));
}
return Pipe::unitePipes(std::move(pipes));
}
template<typename TSource>
ProcessorPtr ReadFromMergeTree::createSource(const RangesInDataPart & part)
{
return std::make_shared<TSource>(
storage, metadata_snapshot, part.data_part, settings.max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, required_columns, part.ranges, settings.use_uncompressed_cache,
prewhere_info, true, settings.reader_settings, virt_column_names, part.part_index_in_query);
}
Pipe ReadFromMergeTree::readInOrder()
{
Pipes pipes;
for (const auto & part : parts)
{
auto source = read_type == ReadType::InReverseOrder
? createSource<MergeTreeReverseSelectProcessor>(part)
: createSource<MergeTreeSelectProcessor>(part);
pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
if (read_type == ReadType::InReverseOrder)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ReverseTransform>(header);
});
}
return pipe;
}
Pipe ReadFromMergeTree::read()
{
if (read_type == ReadType::Default && num_streams > 1)
return readFromPool();
auto pipe = readInOrder();
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if (read_type == ReadType::Default && pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
return pipe;
}
void ReadFromMergeTree::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
Pipe pipe = read();
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
// Attach QueryIdHolder if needed
if (!query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, storage));
pipeline.init(std::move(pipe));
}
static const char * indexTypeToString(ReadFromMergeTree::IndexType type)
{
switch (type)
{
case ReadFromMergeTree::IndexType::None:
return "None";
case ReadFromMergeTree::IndexType::MinMax:
return "MinMax";
case ReadFromMergeTree::IndexType::Partition:
return "Partition";
case ReadFromMergeTree::IndexType::PrimaryKey:
return "PrimaryKey";
case ReadFromMergeTree::IndexType::Skip:
return "Skip";
}
__builtin_unreachable();
}
static const char * readTypeToString(ReadFromMergeTree::ReadType type)
{
switch (type)
{
case ReadFromMergeTree::ReadType::Default:
return "Default";
case ReadFromMergeTree::ReadType::InOrder:
return "InOrder";
case ReadFromMergeTree::ReadType::InReverseOrder:
return "InReverseOrder";
}
__builtin_unreachable();
}
void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
{
std::string prefix(format_settings.offset, format_settings.indent_char);
format_settings.out << prefix << "ReadType: " << readTypeToString(read_type) << '\n';
if (index_stats && !index_stats->empty())
{
format_settings.out << prefix << "Parts: " << index_stats->back().num_parts_after << '\n';
format_settings.out << prefix << "Granules: " << index_stats->back().num_granules_after << '\n';
}
}
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const
{
std::string prefix(format_settings.offset, format_settings.indent_char);
if (index_stats && !index_stats->empty())
{
std::string indent(format_settings.indent, format_settings.indent_char);
/// Do not print anything if no indexes is applied.
if (index_stats->size() > 1 || index_stats->front().type != IndexType::None)
format_settings.out << prefix << "Indexes:\n";
for (size_t i = 0; i < index_stats->size(); ++i)
{
const auto & stat = (*index_stats)[i];
if (stat.type == IndexType::None)
continue;
format_settings.out << prefix << indent << indexTypeToString(stat.type) << '\n';
if (!stat.name.empty())
format_settings.out << prefix << indent << indent << "Name: " << stat.name << '\n';
if (!stat.description.empty())
format_settings.out << prefix << indent << indent << "Description: " << stat.description << '\n';
if (!stat.used_keys.empty())
{
format_settings.out << prefix << indent << indent << "Keys: " << stat.name << '\n';
for (const auto & used_key : stat.used_keys)
format_settings.out << prefix << indent << indent << indent << used_key << '\n';
}
if (!stat.condition.empty())
format_settings.out << prefix << indent << indent << "Condition: " << stat.condition << '\n';
format_settings.out << prefix << indent << indent << "Parts: " << stat.num_parts_after;
if (i)
format_settings.out << '/' << (*index_stats)[i - 1].num_parts_after;
format_settings.out << '\n';
format_settings.out << prefix << indent << indent << "Granules: " << stat.num_granules_after;
if (i)
format_settings.out << '/' << (*index_stats)[i - 1].num_granules_after;
format_settings.out << '\n';
}
}
}
}

View File

@ -0,0 +1,113 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/Pipe.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
namespace DB
{
/// This step is created to read from MergeTree* table.
/// For now, it takes a list of parts and creates source from it.
class ReadFromMergeTree final : public ISourceStep
{
public:
enum class IndexType
{
None,
MinMax,
Partition,
PrimaryKey,
Skip,
};
/// This is a struct with information about applied indexes.
/// Is used for introspection only, in EXPLAIN query.
struct IndexStat
{
IndexType type;
std::string name;
std::string description;
std::string condition;
std::vector<std::string> used_keys;
size_t num_parts_after;
size_t num_granules_after;
};
using IndexStats = std::vector<IndexStat>;
using IndexStatPtr = std::unique_ptr<IndexStats>;
/// Part of settings which are needed for reading.
struct Settings
{
UInt64 max_block_size;
size_t preferred_block_size_bytes;
size_t preferred_max_column_in_block_size_bytes;
size_t min_marks_for_concurrent_read;
bool use_uncompressed_cache;
MergeTreeReaderSettings reader_settings;
MergeTreeReadPool::BackoffSettings backoff_settings;
};
enum class ReadType
{
/// By default, read will use MergeTreeReadPool and return pipe with num_streams outputs.
/// If num_streams == 1, will read without pool, in order specified in parts.
Default,
/// Read in sorting key order.
/// Returned pipe will have the number of ports equals to parts.size().
/// Parameter num_streams_ is ignored in this case.
/// User should add MergingSorted itself if needed.
InOrder,
/// The same as InOrder, but in reverse order.
/// For every part, read ranges and granules from end to begin. Also add ReverseTransform.
InReverseOrder,
};
ReadFromMergeTree(
const MergeTreeData & storage_,
StorageMetadataPtr metadata_snapshot_,
String query_id_,
Names required_columns_,
RangesInDataParts parts_,
IndexStatPtr index_stats_,
PrewhereInfoPtr prewhere_info_,
Names virt_column_names_,
Settings settings_,
size_t num_streams_,
ReadType read_type_
);
String getName() const override { return "ReadFromMergeTree"; }
void initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(FormatSettings & format_settings) const override;
void describeIndexes(FormatSettings & format_settings) const override;
private:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
String query_id;
Names required_columns;
RangesInDataParts parts;
IndexStatPtr index_stats;
PrewhereInfoPtr prewhere_info;
Names virt_column_names;
Settings settings;
size_t num_streams;
ReadType read_type;
Pipe read();
Pipe readFromPool();
Pipe readInOrder();
template<typename TSource>
ProcessorPtr createSource(const RangesInDataPart & part);
};
}

View File

@ -1,37 +0,0 @@
#include <Processors/QueryPlan/ReverseRowsStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/ReverseTransform.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = true,
}
};
}
ReverseRowsStep::ReverseRowsStep(const DataStream & input_stream_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
{
}
void ReverseRowsStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ReverseTransform>(header);
});
}
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
/// Reverse rows in chunk.
class ReverseRowsStep : public ITransformingStep
{
public:
explicit ReverseRowsStep(const DataStream & input_stream_);
String getName() const override { return "ReverseRows"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
};
}

View File

@ -124,9 +124,9 @@ SRCS(
QueryPlan/PartialSortingStep.cpp
QueryPlan/QueryIdHolder.cpp
QueryPlan/QueryPlan.cpp
QueryPlan/ReadFromMergeTree.cpp
QueryPlan/ReadFromPreparedSource.cpp
QueryPlan/ReadNothingStep.cpp
QueryPlan/ReverseRowsStep.cpp
QueryPlan/RollupStep.cpp
QueryPlan/SettingQuotaAndLimitsStep.cpp
QueryPlan/TotalsHavingStep.cpp

View File

@ -938,6 +938,9 @@ public:
return func->getMonotonicityForRange(type, left, right);
}
Kind getKind() const { return kind; }
const ColumnWithTypeAndName & getConstArg() const { return const_arg; }
private:
FunctionBasePtr func;
ColumnWithTypeAndName const_arg;
@ -1308,6 +1311,235 @@ String KeyCondition::toString() const
return res;
}
KeyCondition::Description KeyCondition::getDescription() const
{
/// This code may seem to be too difficult.
/// Here we want to convert RPN back to tree, and also simplify some logical expressions like `and(x, true) -> x`.
Description description;
/// That's a binary tree. Explicit.
/// Build and optimize it simultaneously.
struct Node
{
enum class Type
{
/// Leaf, which is RPNElement.
Leaf,
/// Leafs, which are logical constants.
True,
False,
/// Binary operators.
And,
Or,
};
Type type;
/// Only for Leaf
const RPNElement * element = nullptr;
/// This means that logical NOT is applied to leaf.
bool negate = false;
std::unique_ptr<Node> left = nullptr;
std::unique_ptr<Node> right = nullptr;
};
/// The algorithm is the same as in KeyCondition::checkInHyperrectangle
/// We build a pair of trees on stack. For checking if key condition may be true, and if it may be false.
/// We need only `can_be_true` in result.
struct Frame
{
std::unique_ptr<Node> can_be_true;
std::unique_ptr<Node> can_be_false;
};
/// Combine two subtrees using logical operator.
auto combine = [](std::unique_ptr<Node> left, std::unique_ptr<Node> right, Node::Type type)
{
/// Simplify operators with for one constant condition.
if (type == Node::Type::And)
{
/// false AND right
if (left->type == Node::Type::False)
return left;
/// left AND false
if (right->type == Node::Type::False)
return right;
/// true AND right
if (left->type == Node::Type::True)
return right;
/// left AND true
if (right->type == Node::Type::True)
return left;
}
if (type == Node::Type::Or)
{
/// false OR right
if (left->type == Node::Type::False)
return right;
/// left OR false
if (right->type == Node::Type::False)
return left;
/// true OR right
if (left->type == Node::Type::True)
return left;
/// left OR true
if (right->type == Node::Type::True)
return right;
}
return std::make_unique<Node>(Node{
.type = type,
.left = std::move(left),
.right = std::move(right)
});
};
std::vector<Frame> rpn_stack;
for (const auto & element : rpn)
{
if (element.function == RPNElement::FUNCTION_UNKNOWN)
{
auto can_be_true = std::make_unique<Node>(Node{.type = Node::Type::True});
auto can_be_false = std::make_unique<Node>(Node{.type = Node::Type::True});
rpn_stack.emplace_back(Frame{.can_be_true = std::move(can_be_true), .can_be_false = std::move(can_be_false)});
}
else if (
element.function == RPNElement::FUNCTION_IN_RANGE
|| element.function == RPNElement::FUNCTION_NOT_IN_RANGE
|| element.function == RPNElement::FUNCTION_IN_SET
|| element.function == RPNElement::FUNCTION_NOT_IN_SET)
{
auto can_be_true = std::make_unique<Node>(Node{.type = Node::Type::Leaf, .element = &element, .negate = false});
auto can_be_false = std::make_unique<Node>(Node{.type = Node::Type::Leaf, .element = &element, .negate = true});
rpn_stack.emplace_back(Frame{.can_be_true = std::move(can_be_true), .can_be_false = std::move(can_be_false)});
}
else if (element.function == RPNElement::FUNCTION_NOT)
{
assert(!rpn_stack.empty());
std::swap(rpn_stack.back().can_be_true, rpn_stack.back().can_be_false);
}
else if (element.function == RPNElement::FUNCTION_AND)
{
assert(!rpn_stack.empty());
auto arg1 = std::move(rpn_stack.back());
rpn_stack.pop_back();
assert(!rpn_stack.empty());
auto arg2 = std::move(rpn_stack.back());
Frame frame;
frame.can_be_true = combine(std::move(arg1.can_be_true), std::move(arg2.can_be_true), Node::Type::And);
frame.can_be_false = combine(std::move(arg1.can_be_false), std::move(arg2.can_be_false), Node::Type::Or);
rpn_stack.back() = std::move(frame);
}
else if (element.function == RPNElement::FUNCTION_OR)
{
assert(!rpn_stack.empty());
auto arg1 = std::move(rpn_stack.back());
rpn_stack.pop_back();
assert(!rpn_stack.empty());
auto arg2 = std::move(rpn_stack.back());
Frame frame;
frame.can_be_true = combine(std::move(arg1.can_be_true), std::move(arg2.can_be_true), Node::Type::Or);
frame.can_be_false = combine(std::move(arg1.can_be_false), std::move(arg2.can_be_false), Node::Type::And);
rpn_stack.back() = std::move(frame);
}
else if (element.function == RPNElement::ALWAYS_FALSE)
{
auto can_be_true = std::make_unique<Node>(Node{.type = Node::Type::False});
auto can_be_false = std::make_unique<Node>(Node{.type = Node::Type::True});
rpn_stack.emplace_back(Frame{.can_be_true = std::move(can_be_true), .can_be_false = std::move(can_be_false)});
}
else if (element.function == RPNElement::ALWAYS_TRUE)
{
auto can_be_true = std::make_unique<Node>(Node{.type = Node::Type::True});
auto can_be_false = std::make_unique<Node>(Node{.type = Node::Type::False});
rpn_stack.emplace_back(Frame{.can_be_true = std::move(can_be_true), .can_be_false = std::move(can_be_false)});
}
else
throw Exception("Unexpected function type in KeyCondition::RPNElement", ErrorCodes::LOGICAL_ERROR);
}
if (rpn_stack.size() != 1)
throw Exception("Unexpected stack size in KeyCondition::checkInRange", ErrorCodes::LOGICAL_ERROR);
std::vector<std::string_view> key_names(key_columns.size());
std::vector<bool> is_key_used(key_columns.size(), false);
for (const auto & key : key_columns)
key_names[key.second] = key.first;
WriteBufferFromOwnString buf;
std::function<void(const Node *)> describe;
describe = [&describe, &key_names, &is_key_used, &buf](const Node * node)
{
switch (node->type)
{
case Node::Type::Leaf:
{
is_key_used[node->element->key_column] = true;
/// Note: for condition with double negation, like `not(x not in set)`,
/// we can replace it to `x in set` here.
/// But I won't do it, because `cloneASTWithInversionPushDown` already push down `not`.
/// So, this seem to be impossible for `can_be_true` tree.
if (node->negate)
buf << "not(";
buf << node->element->toString(key_names[node->element->key_column], true);
if (node->negate)
buf << ")";
break;
}
case Node::Type::True:
buf << "true";
break;
case Node::Type::False:
buf << "false";
break;
case Node::Type::And:
buf << "and(";
describe(node->left.get());
buf << ", ";
describe(node->right.get());
buf << ")";
break;
case Node::Type::Or:
buf << "or(";
describe(node->left.get());
buf << ", ";
describe(node->right.get());
buf << ")";
break;
}
};
describe(rpn_stack.front().can_be_true.get());
description.condition = std::move(buf.str());
for (size_t i = 0; i < key_names.size(); ++i)
if (is_key_used[i])
description.used_keys.emplace_back(key_names[i]);
return description;
}
/** Index is the value of key every `index_granularity` rows.
* This value is called a "mark". That is, the index consists of marks.
@ -1733,18 +1965,38 @@ bool KeyCondition::mayBeTrueAfter(
return checkInRange(used_key_size, left_key, nullptr, data_types, false, BoolMask::consider_only_can_be_true).can_be_true;
}
String KeyCondition::RPNElement::toString() const
String KeyCondition::RPNElement::toString() const { return toString("column " + std::to_string(key_column), false); }
String KeyCondition::RPNElement::toString(const std::string_view & column_name, bool print_constants) const
{
auto print_wrapped_column = [this](WriteBuffer & buf)
auto print_wrapped_column = [this, &column_name, print_constants](WriteBuffer & buf)
{
for (auto it = monotonic_functions_chain.rbegin(); it != monotonic_functions_chain.rend(); ++it)
{
buf << (*it)->getName() << "(";
if (print_constants)
{
if (const auto * func = typeid_cast<const FunctionWithOptionalConstArg *>(it->get()))
{
if (func->getKind() == FunctionWithOptionalConstArg::Kind::LEFT_CONST)
buf << applyVisitor(FieldVisitorToString(), (*func->getConstArg().column)[0]) << ", ";
}
}
}
buf << "column " << key_column;
buf << column_name;
for (auto it = monotonic_functions_chain.rbegin(); it != monotonic_functions_chain.rend(); ++it)
{
if (print_constants)
{
if (const auto * func = typeid_cast<const FunctionWithOptionalConstArg *>(it->get()))
{
if (func->getKind() == FunctionWithOptionalConstArg::Kind::RIGHT_CONST)
buf << ", " << applyVisitor(FieldVisitorToString(), (*func->getConstArg().column)[0]);
}
}
buf << ")";
}
};
WriteBufferFromOwnString buf;

View File

@ -293,6 +293,16 @@ public:
String toString() const;
/// Condition description for EXPLAIN query.
struct Description
{
/// Which columns from PK were used, in PK order.
std::vector<std::string> used_keys;
/// Condition which was applied, mostly human-readable.
std::string condition;
};
Description getDescription() const;
/** A chain of possibly monotone functions.
* If the key column is wrapped in functions that can be monotonous in some value ranges
@ -345,6 +355,7 @@ private:
: function(function_), range(range_), key_column(key_column_) {}
String toString() const;
String toString(const std::string_view & column_name, bool print_constants) const;
Function function = FUNCTION_UNKNOWN;

View File

@ -30,7 +30,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_)
: SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_))
: SourceWithProgress(transformHeader(std::move(header), prewhere_info_, virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
@ -370,7 +370,7 @@ void MergeTreeBaseSelectProcessor::executePrewhereActions(Block & block, const P
}
}
Block MergeTreeBaseSelectProcessor::getHeader(
Block MergeTreeBaseSelectProcessor::transformHeader(
Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns)
{
executePrewhereActions(block, prewhere_info);

View File

@ -33,6 +33,8 @@ public:
~MergeTreeBaseSelectProcessor() override;
static Block transformHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
static void executePrewhereActions(Block & block, const PrewhereInfoPtr & prewhere_info);
protected:
@ -49,8 +51,6 @@ protected:
static void injectVirtualColumns(Block & block, MergeTreeReadTask * task, const Names & virtual_columns);
static void injectVirtualColumns(Chunk & chunk, MergeTreeReadTask * task, const Names & virtual_columns);
static Block getHeader(Block block, const PrewhereInfoPtr & prewhere_info, const Names & virtual_columns);
void initializeRangeReaders(MergeTreeReadTask & task);
protected:

View File

@ -28,7 +28,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ReverseRowsStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/MergingSortedStep.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/MergingFinal.h>
@ -282,11 +282,40 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, query_context);
else
selectPartsToRead(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read);
PartFilterCounters part_filter_counters;
auto index_stats = std::make_unique<ReadFromMergeTree::IndexStats>();
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, query_context, part_filter_counters);
else
selectPartsToRead(parts, part_values, minmax_idx_condition, minmax_columns_types, partition_pruner, max_block_numbers_to_read, part_filter_counters);
index_stats->emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::None,
.num_parts_after = part_filter_counters.num_initial_selected_parts,
.num_granules_after = part_filter_counters.num_initial_selected_granules});
if (minmax_idx_condition)
{
auto description = minmax_idx_condition->getDescription();
index_stats->emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::MinMax,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
.num_parts_after = part_filter_counters.num_parts_after_minmax,
.num_granules_after = part_filter_counters.num_granules_after_minmax});
}
if (partition_pruner)
{
auto description = partition_pruner->getKeyCondition().getDescription();
index_stats->emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::Partition,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
.num_parts_after = part_filter_counters.num_parts_after_partition_pruner,
.num_granules_after = part_filter_counters.num_granules_after_partition_pruner});
}
/// Sampling.
Names column_names_to_read = real_column_names;
@ -568,6 +597,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
MergeTreeIndexConditionPtr condition;
std::atomic<size_t> total_granules{0};
std::atomic<size_t> granules_dropped{0};
std::atomic<size_t> total_parts{0};
std::atomic<size_t> parts_dropped{0};
DataSkippingIndexAndCondition(MergeTreeIndexPtr index_, MergeTreeIndexConditionPtr condition_)
: index(index_)
@ -620,6 +651,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
RangesInDataParts parts_with_ranges(parts.size());
size_t sum_marks = 0;
std::atomic<size_t> sum_marks_pk = 0;
std::atomic<size_t> sum_parts_pk = 0;
std::atomic<size_t> total_marks_pk = 0;
size_t sum_ranges = 0;
@ -642,25 +674,29 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
RangesInDataPart ranges(part, part_index);
total_marks_pk.fetch_add(part->index_granularity.getMarksCount(), std::memory_order_relaxed);
size_t total_marks_count = part->getMarksCount();
if (total_marks_count && part->index_granularity.hasFinalMark())
--total_marks_count;
total_marks_pk.fetch_add(total_marks_count, std::memory_order_relaxed);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
else
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
}
else if (total_marks_count)
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
sum_marks_pk.fetch_add(ranges.getMarksCount(), std::memory_order_relaxed);
if (!ranges.ranges.empty())
sum_parts_pk.fetch_add(1, std::memory_order_relaxed);
for (auto & index_and_condition : useful_indices)
{
if (ranges.ranges.empty())
break;
index_and_condition.total_parts.fetch_add(1, std::memory_order_relaxed);
size_t total_granules = 0;
size_t granules_dropped = 0;
ranges.ranges = filterMarksUsingIndex(
@ -672,6 +708,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
index_and_condition.granules_dropped.fetch_add(granules_dropped, std::memory_order_relaxed);
if (ranges.ranges.empty())
index_and_condition.parts_dropped.fetch_add(1, std::memory_order_relaxed);
}
if (!ranges.ranges.empty())
@ -737,12 +776,34 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
parts_with_ranges.resize(next_part);
}
if (metadata_snapshot->hasPrimaryKey())
{
auto description = key_condition.getDescription();
index_stats->emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::PrimaryKey,
.condition = std::move(description.condition),
.used_keys = std::move(description.used_keys),
.num_parts_after = sum_parts_pk.load(std::memory_order_relaxed),
.num_granules_after = sum_marks_pk.load(std::memory_order_relaxed)});
}
for (const auto & index_and_condition : useful_indices)
{
const auto & index_name = index_and_condition.index->index.name;
LOG_DEBUG(log, "Index {} has dropped {}/{} granules.",
backQuote(index_name),
index_and_condition.granules_dropped, index_and_condition.total_granules);
std::string description = index_and_condition.index->index.type
+ " GRANULARITY " + std::to_string(index_and_condition.index->index.granularity);
index_stats->emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::Skip,
.name = index_name,
.description = std::move(description),
.num_parts_after = index_and_condition.total_parts - index_and_condition.parts_dropped,
.num_granules_after = index_and_condition.total_granules - index_and_condition.granules_dropped});
}
LOG_DEBUG(log, "Selected {}/{} parts by partition key, {} parts by primary key, {}/{} marks by primary key, {} marks to read from {} ranges",
@ -809,6 +870,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
plan = spreadMarkRangesAmongStreamsFinal(
std::move(parts_with_ranges),
std::move(index_stats),
num_streams,
column_names_to_read,
metadata_snapshot,
@ -832,6 +894,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
plan = spreadMarkRangesAmongStreamsWithOrder(
std::move(parts_with_ranges),
std::move(index_stats),
num_streams,
column_names_to_read,
metadata_snapshot,
@ -849,6 +912,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
{
plan = spreadMarkRangesAmongStreams(
std::move(parts_with_ranges),
std::move(index_stats),
num_streams,
column_names_to_read,
metadata_snapshot,
@ -960,25 +1024,9 @@ size_t minMarksForConcurrentRead(
}
static QueryPlanPtr createPlanFromPipe(Pipe pipe, const String & query_id, const MergeTreeData & data, const std::string & description = "")
{
auto plan = std::make_unique<QueryPlan>();
std::string storage_name = "MergeTree";
if (!description.empty())
storage_name += ' ' + description;
// Attach QueryIdHolder if needed
if (!query_id.empty())
pipe.addQueryIdHolder(std::make_shared<QueryIdHolder>(query_id, data));
auto step = std::make_unique<ReadFromStorageStep>(std::move(pipe), storage_name);
plan->addStep(std::move(step));
return plan;
}
QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -1030,75 +1078,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
if (0 == sum_marks)
return {};
ReadFromMergeTree::Settings step_settings
{
.max_block_size = max_block_size,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
.min_marks_for_concurrent_read = min_marks_for_concurrent_read,
.use_uncompressed_cache = use_uncompressed_cache,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
if (num_streams > 1)
{
/// Parallel query execution.
Pipes res;
/// Reduce the number of num_streams if the data is small.
if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams,
sum_marks,
min_marks_for_concurrent_read,
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
settings.preferred_block_size_bytes,
false);
/// Let's estimate total number of rows for progress bar.
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info, reader_settings, virt_columns);
if (i == 0)
{
/// Set the approximate number of rows for the first source only
source->addTotalRowsApprox(total_rows);
}
res.emplace_back(std::move(source));
}
return createPlanFromPipe(Pipe::unitePipes(std::move(res)), query_id, data);
}
else
{
/// Sequential query execution.
Pipes res;
for (const auto & part : parts)
{
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
auto plan = std::make_unique<QueryPlan>();
auto step = std::make_unique<ReadFromMergeTree>(
data, metadata_snapshot, query_id,
column_names, std::move(parts), std::move(index_stats), query_info.prewhere_info, virt_columns,
step_settings, num_streams, ReadFromMergeTree::ReadType::Default);
res.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(res));
/// Use ConcatProcessor to concat sources together.
/// It is needed to read in parts order (and so in PK order) if single thread is used.
if (pipe.numOutputPorts() > 1)
pipe.addTransform(std::make_shared<ConcatProcessor>(pipe.getHeader(), pipe.numOutputPorts()));
return createPlanFromPipe(std::move(pipe), query_id, data);
}
plan->addStep(std::move(step));
return plan;
}
static ActionsDAGPtr createProjection(const Block & header)
@ -1111,6 +1116,7 @@ static ActionsDAGPtr createProjection(const Block & header)
QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -1218,8 +1224,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
{
size_t need_marks = min_marks_per_stream;
Pipes pipes;
RangesInDataParts new_parts;
/// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
@ -1274,53 +1279,31 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
parts.emplace_back(part);
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
if (input_order_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
}
else
{
pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
}
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
}
auto plan = createPlanFromPipe(Pipe::unitePipes(std::move(pipes)), query_id, data, "with order");
if (input_order_info->direction != 1)
ReadFromMergeTree::Settings step_settings
{
auto reverse_step = std::make_unique<ReverseRowsStep>(plan->getCurrentDataStream());
plan->addStep(std::move(reverse_step));
}
.max_block_size = max_block_size,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
.min_marks_for_concurrent_read = min_marks_for_concurrent_read,
.use_uncompressed_cache = use_uncompressed_cache,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
auto read_type = input_order_info->direction == 1
? ReadFromMergeTree::ReadType::InOrder
: ReadFromMergeTree::ReadType::InReverseOrder;
auto plan = std::make_unique<QueryPlan>();
auto step = std::make_unique<ReadFromMergeTree>(
data, metadata_snapshot, query_id,
column_names, std::move(new_parts), std::move(index_stats), query_info.prewhere_info, virt_columns,
step_settings, num_streams, read_type);
plan->addStep(std::move(step));
plans.emplace_back(std::move(plan));
}
@ -1371,6 +1354,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -1412,7 +1396,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
num_streams = settings.max_final_threads;
/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
/// We have all parts in parts vector, where parts with same partition are nerby.
/// We have all parts in parts vector, where parts with same partition are nearby.
/// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
/// then we will create a pipe for each partition that will run selecting processor and merging processor
/// for the parts with this partition. In the end we will unite all the pipes.
@ -1451,7 +1435,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
QueryPlanPtr plan;
{
Pipes pipes;
RangesInDataParts new_parts;
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part and if num_streams > 1 we
@ -1470,36 +1454,35 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
{
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
pipes.emplace_back(std::move(source_processor));
new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges);
}
}
if (pipes.empty())
if (new_parts.empty())
continue;
auto pipe = Pipe::unitePipes(std::move(pipes));
ReadFromMergeTree::Settings step_settings
{
.max_block_size = max_block_size,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
.min_marks_for_concurrent_read = 0, /// this setting is not used for reading in order
.use_uncompressed_cache = use_uncompressed_cache,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
plan = std::make_unique<QueryPlan>();
auto step = std::make_unique<ReadFromMergeTree>(
data, metadata_snapshot, query_id,
column_names, std::move(new_parts), std::move(index_stats), query_info.prewhere_info, virt_columns,
step_settings, num_streams, ReadFromMergeTree::ReadType::InOrder);
plan->addStep(std::move(step));
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
out_projection = createProjection(plan->getCurrentDataStream().header);
}
auto expression_step = std::make_unique<ExpressionStep>(
@ -1546,7 +1529,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (!lonely_parts.empty())
{
Pipes pipes;
RangesInDataParts new_parts;
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
@ -1561,41 +1544,28 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (sum_marks_in_lonely_parts < num_streams_for_lonely_parts * min_marks_for_concurrent_read && lonely_parts.size() < num_streams_for_lonely_parts)
num_streams_for_lonely_parts = std::max((sum_marks_in_lonely_parts + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, lonely_parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams_for_lonely_parts,
sum_marks_in_lonely_parts,
min_marks_for_concurrent_read,
std::move(lonely_parts),
data,
metadata_snapshot,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
settings.preferred_block_size_bytes,
false);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
for (size_t i = 0; i < num_streams_for_lonely_parts; ++i)
ReadFromMergeTree::Settings step_settings
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info, reader_settings, virt_columns);
.max_block_size = max_block_size,
.preferred_block_size_bytes = settings.preferred_block_size_bytes,
.preferred_max_column_in_block_size_bytes = settings.preferred_max_column_in_block_size_bytes,
.min_marks_for_concurrent_read = min_marks_for_concurrent_read,
.use_uncompressed_cache = use_uncompressed_cache,
.reader_settings = reader_settings,
.backoff_settings = MergeTreeReadPool::BackoffSettings(settings),
};
pipes.emplace_back(std::move(source));
}
auto plan = std::make_unique<QueryPlan>();
auto step = std::make_unique<ReadFromMergeTree>(
data, metadata_snapshot, query_id,
column_names, std::move(lonely_parts), std::move(index_stats), query_info.prewhere_info, virt_columns,
step_settings, num_streams_for_lonely_parts, ReadFromMergeTree::ReadType::Default);
auto pipe = Pipe::unitePipes(std::move(pipes));
plan->addStep(std::move(step));
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
QueryPlanPtr plan = createPlanFromPipe(std::move(pipe), query_id, data, "with final");
out_projection = createProjection(plan->getCurrentDataStream().header);
auto expression_step = std::make_unique<ExpressionStep>(
plan->getCurrentDataStream(),
@ -1896,7 +1866,8 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read)
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters)
{
auto prev_parts = parts;
parts.clear();
@ -1909,22 +1880,35 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
if (part->isEmpty())
continue;
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
size_t num_granules = part->getMarksCount();
if (num_granules && part->index_granularity.hasFinalMark())
--num_granules;
counters.num_initial_selected_parts += 1;
counters.num_initial_selected_granules += num_granules;
if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
part->minmax_idx.hyperrectangle, minmax_columns_types).can_be_true)
continue;
counters.num_parts_after_minmax += 1;
counters.num_granules_after_minmax += num_granules;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
if (blocks_iterator == max_block_numbers_to_read->end() || part->info.max_block > blocks_iterator->second)
continue;
}
counters.num_parts_after_partition_pruner += 1;
counters.num_granules_after_partition_pruner += num_granules;
parts.push_back(part);
}
@ -1937,7 +1921,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const
ContextPtr query_context,
PartFilterCounters & counters) const
{
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
@ -1957,17 +1942,6 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
if (part->isEmpty())
continue;
if (minmax_idx_condition
&& !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx.hyperrectangle, minmax_columns_types)
.can_be_true)
continue;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
if (max_block_numbers_to_read)
{
auto blocks_iterator = max_block_numbers_to_read->find(part->info.partition_id);
@ -1975,13 +1949,37 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
continue;
}
/// Skip the part if its uuid is meant to be excluded
if (part->uuid != UUIDHelpers::Nil && ignored_part_uuids->has(part->uuid))
continue;
size_t num_granules = part->getMarksCount();
if (num_granules && part->index_granularity.hasFinalMark())
--num_granules;
counters.num_initial_selected_parts += 1;
counters.num_initial_selected_granules += num_granules;
if (minmax_idx_condition
&& !minmax_idx_condition->checkInHyperrectangle(part->minmax_idx.hyperrectangle, minmax_columns_types)
.can_be_true)
continue;
counters.num_parts_after_minmax += 1;
counters.num_granules_after_minmax += num_granules;
if (partition_pruner)
{
if (partition_pruner->canBePruned(part))
continue;
}
counters.num_parts_after_partition_pruner += 1;
counters.num_granules_after_partition_pruner += num_granules;
/// populate UUIDs and exclude ignored parts if enabled
if (part->uuid != UUIDHelpers::Nil)
{
/// Skip the part if its uuid is meant to be excluded
if (ignored_part_uuids->has(part->uuid))
continue;
auto result = temp_part_uuids.insert(part->uuid);
if (!result.second)
throw Exception("Found a part with the same UUID on the same replica.", ErrorCodes::LOGICAL_ERROR);
@ -2013,6 +2011,8 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
{
LOG_DEBUG(log, "Found duplicate uuids locally, will retry part selection without them");
counters = PartFilterCounters();
/// Second attempt didn't help, throw an exception
if (!select_parts(parts))
throw Exception("Found duplicate UUIDs while processing query.", ErrorCodes::DUPLICATED_PART_UUIDS);

View File

@ -5,6 +5,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
namespace DB
@ -57,6 +58,7 @@ private:
QueryPlanPtr spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -71,6 +73,7 @@ private:
/// out_projection - save projection only with columns, requested to read
QueryPlanPtr spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -86,6 +89,7 @@ private:
QueryPlanPtr spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
ReadFromMergeTree::IndexStatPtr index_stats,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -123,6 +127,16 @@ private:
size_t & granules_dropped,
Poco::Logger * log);
struct PartFilterCounters
{
size_t num_initial_selected_parts = 0;
size_t num_initial_selected_granules = 0;
size_t num_parts_after_minmax = 0;
size_t num_granules_after_minmax = 0;
size_t num_parts_after_partition_pruner = 0;
size_t num_granules_after_partition_pruner = 0;
};
/// Select the parts in which there can be data that satisfy `minmax_idx_condition` and that match the condition on `_part`,
/// as well as `max_block_number_to_read`.
static void selectPartsToRead(
@ -131,7 +145,8 @@ private:
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read);
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters);
/// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
void selectPartsToReadWithUUIDFilter(
@ -141,7 +156,8 @@ private:
const DataTypes & minmax_columns_types,
std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const;
ContextPtr query_context,
PartFilterCounters & counters) const;
};
}

View File

@ -100,7 +100,7 @@ private:
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;
Names column_names;
const Names column_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;
std::vector<NameSet> per_part_column_name_set;

View File

@ -32,6 +32,8 @@ public:
bool canBePruned(const DataPartPtr & part);
bool isUseless() const { return useless; }
const KeyCondition & getKeyCondition() const { return partition_condition; }
};
}

View File

@ -18,9 +18,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1');
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 10 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 10 }
SELECT '--------------Single Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_distributed_1');
@ -36,9 +36,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _ta
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- { serverError 10 }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- { serverError 10 }
SELECT '--------------Local Merge Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_distributed_2') ORDER BY _table;

View File

@ -5,11 +5,11 @@ Selected 0/6 parts by partition key, 0 parts by primary key, 0/0 marks by primar
select uniqExact(_part), count() from tMM where toDate(d)=toDate('2020-09-01');
2 2880
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toDate(d)=toDate('2020-10-15');
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toDate(d)='2020-09-15';
0 0
@ -17,27 +17,27 @@ Selected 0/6 parts by partition key, 0 parts by primary key, 0/0 marks by primar
select uniqExact(_part), count() from tMM where toYYYYMM(d)=202009;
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMMDD(d)=20200816;
2 2880
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMMDD(d)=20201015;
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toDate(d)='2020-10-15';
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where d >= '2020-09-01 00:00:00' and d<'2020-10-15 00:00:00';
3 15000
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from tMM where d >= '2020-01-16 00:00:00' and d < toDateTime('2021-08-17 00:00:00');
6 30000
Selected 6/6 parts by partition key, 6 parts by primary key, 6/12 marks by primary key, 6 marks to read from 6 ranges
Selected 6/6 parts by partition key, 6 parts by primary key, 6/6 marks by primary key, 6 marks to read from 6 ranges
select uniqExact(_part), count() from tMM where d >= '2020-09-16 00:00:00' and d < toDateTime('2020-10-01 00:00:00');
0 0
@ -45,117 +45,117 @@ Selected 0/6 parts by partition key, 0 parts by primary key, 0/0 marks by primar
select uniqExact(_part), count() from tMM where d >= '2020-09-12 00:00:00' and d < '2020-10-16 00:00:00';
2 6440
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toStartOfDay(d) >= '2020-09-12 00:00:00';
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toStartOfDay(d) = '2020-09-01 00:00:00';
2 2880
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toStartOfDay(d) = '2020-10-01 00:00:00';
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toStartOfDay(d) >= '2020-09-15 00:00:00' and d < '2020-10-16 00:00:00';
2 6440
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) between 202009 and 202010;
4 20000
Selected 4/6 parts by partition key, 4 parts by primary key, 4/8 marks by primary key, 4 marks to read from 4 ranges
Selected 4/6 parts by partition key, 4 parts by primary key, 4/4 marks by primary key, 4 marks to read from 4 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) between 202009 and 202009;
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) between 202009 and 202010 and toStartOfDay(d) = '2020-10-01 00:00:00';
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) >= 202009 and toStartOfDay(d) < '2020-10-02 00:00:00';
3 11440
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) > 202009 and toStartOfDay(d) < '2020-10-02 00:00:00';
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d)+1 > 202009 and toStartOfDay(d) < '2020-10-02 00:00:00';
3 11440
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d)+1 > 202010 and toStartOfDay(d) < '2020-10-02 00:00:00';
1 1440
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d)+1 > 202010;
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d-1)+1 = 202010;
3 9999
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from tMM where toStartOfMonth(d) >= '2020-09-15';
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toStartOfMonth(d) >= '2020-09-01';
4 20000
Selected 4/6 parts by partition key, 4 parts by primary key, 4/8 marks by primary key, 4 marks to read from 4 ranges
Selected 4/6 parts by partition key, 4 parts by primary key, 4/4 marks by primary key, 4 marks to read from 4 ranges
select uniqExact(_part), count() from tMM where toStartOfMonth(d) >= '2020-09-01' and toStartOfMonth(d) < '2020-10-01';
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d-1)+1 = 202010;
2 9999
Selected 2/3 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/3 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d)+1 > 202010;
1 10000
Selected 1/3 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/3 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from tMM where toYYYYMM(d) between 202009 and 202010;
2 20000
Selected 2/3 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/3 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
--------- tDD ----------------------------
select uniqExact(_part), count() from tDD where toDate(d)=toDate('2020-09-24');
1 10000
Selected 1/4 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) = toDate('2020-09-24');
1 10000
Selected 1/4 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) = '2020-09-24';
1 10000
Selected 1/4 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/4 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() FROM tDD WHERE toDate(d) >= '2020-09-23' and toDate(d) <= '2020-09-26';
3 40000
Selected 3/4 parts by partition key, 3 parts by primary key, 4/7 marks by primary key, 4 marks to read from 3 ranges
Selected 3/4 parts by partition key, 3 parts by primary key, 4/4 marks by primary key, 4 marks to read from 3 ranges
select uniqExact(_part), count() FROM tDD WHERE toYYYYMMDD(d) >= 20200923 and toDate(d) <= '2020-09-26';
3 40000
Selected 3/4 parts by partition key, 3 parts by primary key, 4/7 marks by primary key, 4 marks to read from 3 ranges
Selected 3/4 parts by partition key, 3 parts by primary key, 4/4 marks by primary key, 4 marks to read from 3 ranges
--------- sDD ----------------------------
select uniqExact(_part), count() from sDD;
6 30000
Selected 6/6 parts by partition key, 6 parts by primary key, 6/12 marks by primary key, 6 marks to read from 6 ranges
Selected 6/6 parts by partition key, 6 parts by primary key, 6/6 marks by primary key, 6 marks to read from 6 ranges
select uniqExact(_part), count() from sDD where toYYYYMM(toDateTime(intDiv(d,1000),'UTC')-1)+1 = 202010;
3 9999
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from sDD where toYYYYMM(toDateTime(intDiv(d,1000),'UTC')-1) = 202010;
2 9999
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from sDD where toYYYYMM(toDateTime(intDiv(d,1000),'UTC')-1) = 202110;
0 0
@ -163,52 +163,52 @@ Selected 0/6 parts by partition key, 0 parts by primary key, 0/0 marks by primar
select uniqExact(_part), count() from sDD where toYYYYMM(toDateTime(intDiv(d,1000),'UTC'))+1 > 202009 and toStartOfDay(toDateTime(intDiv(d,1000),'UTC')) < toDateTime('2020-10-02 00:00:00','UTC');
3 11440
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from sDD where toYYYYMM(toDateTime(intDiv(d,1000),'UTC'))+1 > 202009 and toDateTime(intDiv(d,1000),'UTC') < toDateTime('2020-10-01 00:00:00','UTC');
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from sDD where d >= 1598918400000;
4 20000
Selected 4/6 parts by partition key, 4 parts by primary key, 4/8 marks by primary key, 4 marks to read from 4 ranges
Selected 4/6 parts by partition key, 4 parts by primary key, 4/4 marks by primary key, 4 marks to read from 4 ranges
select uniqExact(_part), count() from sDD where d >= 1598918400000 and toYYYYMM(toDateTime(intDiv(d,1000),'UTC')-1) < 202010;
3 10001
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
--------- xMM ----------------------------
select uniqExact(_part), count() from xMM where toStartOfDay(d) >= '2020-10-01 00:00:00';
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d <= '2020-10-01 00:00:00';
3 10001
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d < '2020-10-01 00:00:00';
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d <= '2020-10-01 00:00:00' and a=1;
1 1
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d <= '2020-10-01 00:00:00' and a<>3;
2 5001
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d < '2020-10-01 00:00:00' and a<>3;
1 5000
Selected 1/6 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/6 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d < '2020-11-01 00:00:00' and a = 1;
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where a = 1;
3 15000
Selected 3/6 parts by partition key, 3 parts by primary key, 3/6 marks by primary key, 3 marks to read from 3 ranges
Selected 3/6 parts by partition key, 3 parts by primary key, 3/3 marks by primary key, 3 marks to read from 3 ranges
select uniqExact(_part), count() from xMM where a = 66;
0 0
@ -216,29 +216,29 @@ Selected 0/6 parts by partition key, 0 parts by primary key, 0/0 marks by primar
select uniqExact(_part), count() from xMM where a <> 66;
6 30000
Selected 6/6 parts by partition key, 6 parts by primary key, 6/12 marks by primary key, 6 marks to read from 6 ranges
Selected 6/6 parts by partition key, 6 parts by primary key, 6/6 marks by primary key, 6 marks to read from 6 ranges
select uniqExact(_part), count() from xMM where a = 2;
2 10000
Selected 2/6 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/6 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where a = 1;
2 15000
Selected 2/5 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/5 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where toStartOfDay(d) >= '2020-10-01 00:00:00';
1 10000
Selected 1/5 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/5 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges
select uniqExact(_part), count() from xMM where a <> 66;
5 30000
Selected 5/5 parts by partition key, 5 parts by primary key, 5/10 marks by primary key, 5 marks to read from 5 ranges
Selected 5/5 parts by partition key, 5 parts by primary key, 5/5 marks by primary key, 5 marks to read from 5 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d <= '2020-10-01 00:00:00' and a<>3;
2 5001
Selected 2/5 parts by partition key, 2 parts by primary key, 2/4 marks by primary key, 2 marks to read from 2 ranges
Selected 2/5 parts by partition key, 2 parts by primary key, 2/2 marks by primary key, 2 marks to read from 2 ranges
select uniqExact(_part), count() from xMM where d >= '2020-09-01 00:00:00' and d < '2020-10-01 00:00:00' and a<>3;
1 5000
Selected 1/5 parts by partition key, 1 parts by primary key, 1/2 marks by primary key, 1 marks to read from 1 ranges
Selected 1/5 parts by partition key, 1 parts by primary key, 1/1 marks by primary key, 1 marks to read from 1 ranges

View File

@ -13,16 +13,16 @@ ExpressionTransform
(MergingSorted)
(Expression)
ExpressionTransform
(ReadFromStorage)
(ReadFromMergeTree)
MergeTree 0 → 1
(MergingSorted)
MergingSortedTransform 2 → 1
(Expression)
ExpressionTransform × 2
(ReadFromStorage)
(ReadFromMergeTree)
MergeTree × 2 0 → 1
(MergingSorted)
(Expression)
ExpressionTransform
(ReadFromStorage)
(ReadFromMergeTree)
MergeTree 0 → 1

View File

@ -11,7 +11,7 @@ Expression (Projection)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree)
ReadFromMergeTree
SELECT
timestamp,
key
@ -23,7 +23,7 @@ Expression (Projection)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
SELECT
timestamp,
key
@ -37,7 +37,7 @@ Expression (Projection)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
SELECT
timestamp,
key

View File

@ -28,47 +28,47 @@ Expression (Projection)
PartialSorting (Sort each block for ORDER BY)
Expression ((Before ORDER BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree)
ReadFromMergeTree
Expression (Projection)
Limit (preliminary LIMIT)
FinishSorting
Expression ((Before ORDER BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
ReadFromMergeTree
ReadFromMergeTree
Expression (Projection)
Limit (preliminary LIMIT)
FinishSorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
ReadFromMergeTree
ReadFromMergeTree
optimize_aggregation_in_order
Expression ((Projection + Before ORDER BY))
Aggregating
Expression ((Before GROUP BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (MergeTree)
ReadFromMergeTree
Expression ((Projection + Before ORDER BY))
Aggregating
Expression ((Before GROUP BY + Add table aliases))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
ReadFromMergeTree
ReadFromMergeTree
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromStorage (MergeTree with order)
ReadFromMergeTree
ReadFromMergeTree
ReadFromMergeTree
second-index
1
1

View File

@ -0,0 +1,51 @@
ReadFromMergeTree
Indexes:
MinMax
Keys:
y
Condition: (y in [1, +inf))
Parts: 4/5
Granules: 11/12
Partition
Keys:
y
bitAnd(z, 3)
Condition: and((bitAnd(z, 3) not in [1, 1]), and((y in [1, +inf)), (bitAnd(z, 3) not in [1, 1])))
Parts: 3/4
Granules: 10/11
PrimaryKey
Keys:
x
y
Condition: and((x in [11, +inf)), (y in [1, +inf)))
Parts: 2/3
Granules: 6/10
Skip
Name: t_minmax
Description: minmax GRANULARITY 2
Parts: 1/2
Granules: 2/6
Skip
Name: t_set
Description: set GRANULARITY 2
Parts: 1/1
Granules: 1/2
-----------------
ReadFromMergeTree
ReadType: InOrder
Parts: 1
Granules: 3
-----------------
ReadFromMergeTree
ReadType: InReverseOrder
Parts: 1
Granules: 3
ReadFromMergeTree
Indexes:
PrimaryKey
Keys:
x
plus(x, y)
Condition: or((x in 2-element set), (plus(plus(x, y), 1) in (-inf, 2]))
Parts: 1/1
Granules: 1/1

View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists test_index"
$CLICKHOUSE_CLIENT -q "drop table if exists idx"
$CLICKHOUSE_CLIENT -q "create table test_index (x UInt32, y UInt32, z UInt32, t UInt32, index t_minmax t % 20 TYPE minmax GRANULARITY 2, index t_set t % 19 type set(4) granularity 2) engine = MergeTree order by (x, y) partition by (y, bitAnd(z, 3), intDiv(t, 15)) settings index_granularity = 2, min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT -q "insert into test_index select number, number > 3 ? 3 : number, number = 1 ? 1 : 0, number from numbers(20)"
$CLICKHOUSE_CLIENT -q "
explain indexes = 1 select *, _part from test_index where t % 19 = 16 and y > 0 and bitAnd(z, 3) != 1 and x > 10 and t % 20 > 14;
" | grep -A 100 "ReadFromMergeTree" # | grep -v "Description"
echo "-----------------"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select x from test_index where x > 15 order by x;
" | grep -A 100 "ReadFromMergeTree"
echo "-----------------"
$CLICKHOUSE_CLIENT -q "
explain actions = 1 select x from test_index where x > 15 order by x desc;
" | grep -A 100 "ReadFromMergeTree"
$CLICKHOUSE_CLIENT -q "CREATE TABLE idx (x UInt32, y UInt32, z UInt32) ENGINE = MergeTree ORDER BY (x, x + y) settings min_bytes_for_wide_part = 0"
$CLICKHOUSE_CLIENT -q "insert into idx select number, number, number from numbers(10)"
$CLICKHOUSE_CLIENT -q "
explain indexes = 1 select z from idx where not(x + y + 1 > 2 and x not in (4, 5))
" | grep -A 100 "ReadFromMergeTree"
$CLICKHOUSE_CLIENT -q "drop table if exists test_index"
$CLICKHOUSE_CLIENT -q "drop table if exists idx"

View File

@ -392,6 +392,8 @@
"01475_read_subcolumns_storages",
"01674_clickhouse_client_query_param_cte",
"01666_merge_tree_max_query_limit",
"01786_explain_merge_tree",
"01666_merge_tree_max_query_limit",
"01802_test_postgresql_protocol_with_row_policy" /// It cannot parse DROP ROW POLICY
],
"parallel":