Simplify projection, add minmax_count projection.

This commit is contained in:
Amos Bird 2021-08-28 02:35:13 +08:00
parent 703101fe4d
commit b68857d086
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
18 changed files with 314 additions and 251 deletions

View File

@ -7,7 +7,6 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/MergeTreeIndexGranularityInfo.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeProjections.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>

View File

@ -57,6 +57,7 @@
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/join.hpp>
@ -447,8 +448,6 @@ void MergeTreeData::checkProperties(
for (const auto & projection : new_metadata.projections)
{
MergeTreeProjectionFactory::instance().validate(projection);
if (projections_names.find(projection.name) != projections_names.end())
throw Exception(
"Projection with name " + backQuote(projection.name) + " already exists",
@ -768,7 +767,7 @@ Block MergeTreeData::getSampleBlockWithVirtualColumns() const
}
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part, bool ignore_empty) const
{
auto block = getSampleBlockWithVirtualColumns();
MutableColumns columns = block.mutateColumns();
@ -781,6 +780,8 @@ Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPar
bool has_partition_value = typeid_cast<const ColumnTuple *>(partition_value_column.get());
for (const auto & part_or_projection : parts)
{
if (ignore_empty && part_or_projection->isEmpty())
continue;
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
part_column->insert(part->name);
partition_id_column->insert(part->info.partition_id);
@ -4143,6 +4144,110 @@ static void selectBestProjection(
}
Block MergeTreeData::getMinMaxCountProjectionBlock(
const StorageMetadataPtr & metadata_snapshot,
const Names & required_columns,
const SelectQueryInfo & query_info,
ContextPtr query_context) const
{
if (!metadata_snapshot->minmax_count_projection)
throw Exception(
"Cannot find the definition of minmax_count projection but it's used in current query. It's a bug",
ErrorCodes::LOGICAL_ERROR);
auto block = metadata_snapshot->minmax_count_projection->sample_block;
auto minmax_count_columns = block.mutateColumns();
auto insert = [](ColumnAggregateFunction & column, const Field & value)
{
auto func = column.getAggregateFunction();
Arena & arena = column.createOrGetArena();
size_t size_of_state = func->sizeOfData();
size_t align_of_state = func->alignOfData();
auto * place = arena.alignedAlloc(size_of_state, align_of_state);
func->create(place);
auto value_column = func->getReturnType()->createColumnConst(1, value)->convertToFullColumnIfConst();
const auto * value_column_ptr = value_column.get();
func->add(place, &value_column_ptr, 0, &arena);
column.insertFrom(place);
};
auto parts = getDataPartsVector();
ASTPtr expression_ast;
Block virtual_columns_block = getBlockWithVirtualPartColumns(parts, false /* one_part */, true /* ignore_empty */);
if (virtual_columns_block.rows() == 0)
return {};
// Generate valid expressions for filtering
VirtualColumnUtils::prepareFilterBlockWithQuery(query_info.query, query_context, virtual_columns_block, expression_ast);
if (expression_ast)
VirtualColumnUtils::filterBlockWithQuery(query_info.query, virtual_columns_block, query_context, expression_ast);
size_t rows = virtual_columns_block.rows();
const ColumnString & part_name_column = typeid_cast<const ColumnString &>(*virtual_columns_block.getByName("_part").column);
size_t part_idx = 0;
for (size_t row = 0; row < rows; ++row)
{
while (parts[part_idx]->name != part_name_column.getDataAt(row))
++part_idx;
const auto & part = parts[part_idx];
if (!part->minmax_idx.initialized)
throw Exception("Found a non-empty part with uninitialized minmax_idx. It's a bug", ErrorCodes::LOGICAL_ERROR);
size_t minmax_idx_size = part->minmax_idx.hyperrectangle.size();
if (2 * minmax_idx_size + 1 != minmax_count_columns.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"minmax_count projection should have twice plus one the number of ranges in minmax_idx. 2 * minmax_idx_size + 1 = {}, "
"minmax_count_columns.size() = {}. It's a bug",
2 * minmax_idx_size + 1,
minmax_count_columns.size());
for (size_t i = 0; i < minmax_idx_size; ++i)
{
size_t min_pos = i * 2;
size_t max_pos = i * 2 + 1;
auto & min_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[min_pos]);
auto & max_column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns[max_pos]);
const auto & range = part->minmax_idx.hyperrectangle[i];
insert(min_column, range.left);
insert(max_column, range.right);
}
{
auto & column = assert_cast<ColumnAggregateFunction &>(*minmax_count_columns.back());
auto func = column.getAggregateFunction();
Arena & arena = column.createOrGetArena();
size_t size_of_state = func->sizeOfData();
size_t align_of_state = func->alignOfData();
auto * place = arena.alignedAlloc(size_of_state, align_of_state);
func->create(place);
const AggregateFunctionCount & agg_count = assert_cast<const AggregateFunctionCount &>(*func);
agg_count.set(place, part->rows_count);
column.insertFrom(place);
}
}
block.setColumns(std::move(minmax_count_columns));
Block res;
for (const auto & name : required_columns)
{
if (virtual_columns_block.has(name))
res.insert(virtual_columns_block.getByName(name));
else if (block.has(name))
res.insert(block.getByName(name));
else
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Cannot find column {} in minmax_count projection but query analysis still selects this projection. It's a bug",
name);
}
return res;
}
bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const
{
@ -4356,11 +4461,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
}
};
for (const auto & projection : metadata_snapshot->projections)
add_projection_candidate(projection);
ProjectionCandidate * selected_candidate = nullptr;
size_t min_sum_marks = std::numeric_limits<size_t>::max();
if (metadata_snapshot->minmax_count_projection)
add_projection_candidate(*metadata_snapshot->minmax_count_projection);
// Only add more projection candidates if minmax_count_projection cannot match.
if (candidates.empty())
{
for (const auto & projection : metadata_snapshot->projections)
add_projection_candidate(projection);
}
else
{
selected_candidate = &candidates.front();
query_info.minmax_count_projection_block
= getMinMaxCountProjectionBlock(metadata_snapshot, selected_candidate->required_columns, query_info, query_context);
min_sum_marks = query_info.minmax_count_projection_block.rows();
}
// Let's select the best projection to execute the query.
if (!candidates.empty())
if (!candidates.empty() && !selected_candidate)
{
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks;
if (settings.select_sequential_consistency)
@ -4381,7 +4502,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
settings.max_threads,
max_added_blocks);
size_t min_sum_marks = std::numeric_limits<size_t>::max();
if (!query_info.merge_tree_select_result_ptr->error())
{
// Add 1 to base sum_marks so that we prefer projections even when they have equal number of marks to read.
@ -4389,7 +4509,6 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
min_sum_marks = query_info.merge_tree_select_result_ptr->marks() + 1;
}
ProjectionCandidate * selected_candidate = nullptr;
/// Favor aggregate projections
for (auto & candidate : candidates)
{
@ -4429,28 +4548,27 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
min_sum_marks);
}
}
if (!selected_candidate)
return false;
else if (min_sum_marks == 0)
{
/// If selected_projection indicated an empty result set. Remember it in query_info but
/// don't use projection to run the query, because projection pipeline with empty result
/// set will not work correctly with empty_result_for_aggregation_by_empty_set.
query_info.merge_tree_empty_result = true;
return false;
}
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
query_info.projection = std::move(*selected_candidate);
return true;
}
return false;
if (!selected_candidate)
return false;
else if (min_sum_marks == 0)
{
/// If selected_projection indicated an empty result set. Remember it in query_info but
/// don't use projection to run the query, because projection pipeline with empty result
/// set will not work correctly with empty_result_for_aggregation_by_empty_set.
query_info.merge_tree_empty_result = true;
return false;
}
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
{
selected_candidate->aggregation_keys = select.getQueryAnalyzer()->aggregationKeys();
selected_candidate->aggregate_descriptions = select.getQueryAnalyzer()->aggregates();
}
query_info.projection = std::move(*selected_candidate);
return true;
}

View File

@ -362,6 +362,12 @@ public:
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
Block getMinMaxCountProjectionBlock(
const StorageMetadataPtr & metadata_snapshot,
const Names & required_columns,
const SelectQueryInfo & query_info,
ContextPtr query_context) const;
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const;
@ -800,7 +806,7 @@ public:
/// Construct a block consisting only of possible virtual columns for part pruning.
/// If one_part is true, fill in at most one part.
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const;
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part, bool ignore_empty = false) const;
/// Limiting parallel sends per one table, used in DataPartsExchange
std::atomic_uint current_table_sends {0};

View File

@ -1362,7 +1362,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
String destination = new_part_tmp_path;
String file_name = it->name();
auto rename_it = std::find_if(files_to_rename.begin(), files_to_rename.end(), [&file_name](const auto & rename_pair) { return rename_pair.first == file_name; });
auto rename_it = std::find_if(files_to_rename.begin(), files_to_rename.end(), [&file_name](const auto & rename_pair)
{
return rename_pair.first == file_name;
});
if (rename_it != files_to_rename.end())
{
if (rename_it->second.empty())
@ -1723,7 +1726,7 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
const Block & updated_header,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const String & mrk_extension,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc)
const std::set<ProjectionDescriptionRawPtr> & projections_to_recalc)
{
NameSet files_to_skip = source_part->getFileNamesWithoutChecksums();
@ -1740,16 +1743,16 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
auto serialization = source_part->getSerializationForColumn({entry.name, entry.type});
serialization->enumerateStreams(callback);
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + ".idx2");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
for (const auto & projection : projections_to_recalc)
{
files_to_skip.insert(projection->getDirectoryName());
}
return files_to_skip;
}
@ -1853,7 +1856,7 @@ MergeTreeIndices MergeTreeDataMergerMutator::getIndicesForNewDataPart(
return new_indices;
}
MergeTreeProjections MergeTreeDataMergerMutator::getProjectionsForNewDataPart(
std::vector<ProjectionDescriptionRawPtr> MergeTreeDataMergerMutator::getProjectionsForNewDataPart(
const ProjectionsDescription & all_projections,
const MutationCommands & commands_for_removes)
{
@ -1862,10 +1865,10 @@ MergeTreeProjections MergeTreeDataMergerMutator::getProjectionsForNewDataPart(
if (command.type == MutationCommand::DROP_PROJECTION)
removed_projections.insert(command.column_name);
MergeTreeProjections new_projections;
std::vector<ProjectionDescriptionRawPtr> new_projections;
for (const auto & projection : all_projections)
if (!removed_projections.count(projection.name))
new_projections.push_back(MergeTreeProjectionFactory::instance().get(projection));
new_projections.push_back(&projection);
return new_projections;
}
@ -1941,21 +1944,20 @@ std::set<MergeTreeIndexPtr> MergeTreeDataMergerMutator::getIndicesToRecalculate(
return indices_to_recalc;
}
std::set<MergeTreeProjectionPtr> MergeTreeDataMergerMutator::getProjectionsToRecalculate(
std::set<ProjectionDescriptionRawPtr> MergeTreeDataMergerMutator::getProjectionsToRecalculate(
const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
const NameSet & materialized_projections,
const MergeTreeData::DataPartPtr & source_part)
{
/// Checks if columns used in projections modified.
const auto & projection_factory = MergeTreeProjectionFactory::instance();
std::set<MergeTreeProjectionPtr> projections_to_recalc;
std::set<ProjectionDescriptionRawPtr> projections_to_recalc;
for (const auto & projection : metadata_snapshot->getProjections())
{
// If we ask to materialize and it doesn't exist
if (!source_part->checksums.has(projection.name + ".proj") && materialized_projections.count(projection.name))
{
projections_to_recalc.insert(projection_factory.get(projection));
projections_to_recalc.insert(&projection);
}
else
{
@ -1971,7 +1973,7 @@ std::set<MergeTreeProjectionPtr> MergeTreeDataMergerMutator::getProjectionsToRec
}
}
if (mutate)
projections_to_recalc.insert(projection_factory.get(projection));
projections_to_recalc.insert(&projection);
}
}
return projections_to_recalc;
@ -2001,7 +2003,7 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat
void MergeTreeDataMergerMutator::writeWithProjections(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeProjections & projections_to_build,
const std::vector<ProjectionDescriptionRawPtr> & projections_to_build,
BlockInputStreamPtr mutating_stream,
IMergedBlockOutputStream & out,
time_t time_of_mutation,
@ -2037,7 +2039,7 @@ void MergeTreeDataMergerMutator::writeWithProjections(
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{
const auto & projection = projections_to_build[i]->projection;
const auto & projection = *projections_to_build[i];
auto projection_block = projection_squashes[i].add(projection.calculate(block, context));
if (projection_block)
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
@ -2051,7 +2053,7 @@ void MergeTreeDataMergerMutator::writeWithProjections(
// Write the last block
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
{
const auto & projection = projections_to_build[i]->projection;
const auto & projection = *projections_to_build[i];
auto & projection_squash = projection_squashes[i];
auto projection_block = projection_squash.add({});
if (projection_block)
@ -2154,7 +2156,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeIndices & skip_indices,
const MergeTreeProjections & projections_to_build,
const std::vector<ProjectionDescriptionRawPtr> & projections_to_build,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & compression_codec,
@ -2212,7 +2214,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc,
const std::set<ProjectionDescriptionRawPtr> & projections_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
BlockInputStreamPtr mutating_stream,
@ -2249,7 +2251,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
mutating_stream->readPrefix();
out.writePrefix();
std::vector<MergeTreeProjectionPtr> projections_to_build(projections_to_recalc.begin(), projections_to_recalc.end());
std::vector<ProjectionDescriptionRawPtr> projections_to_build(projections_to_recalc.begin(), projections_to_recalc.end());
writeWithProjections(
new_data_part,
metadata_snapshot,

View File

@ -189,7 +189,7 @@ private:
const Block & updated_header,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const String & mrk_extension,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc);
const std::set<ProjectionDescriptionRawPtr> & projections_to_recalc);
/// Get the columns list of the resulting part in the same order as storage_columns.
static NamesAndTypesList getColumnsForNewDataPart(
@ -203,7 +203,7 @@ private:
const IndicesDescription & all_indices,
const MutationCommands & commands_for_removes);
static MergeTreeProjections getProjectionsForNewDataPart(
static std::vector<ProjectionDescriptionRawPtr> getProjectionsForNewDataPart(
const ProjectionsDescription & all_projections,
const MutationCommands & commands_for_removes);
@ -219,7 +219,7 @@ private:
const NameSet & materialized_indices,
const MergeTreeData::DataPartPtr & source_part);
static std::set<MergeTreeProjectionPtr> getProjectionsToRecalculate(
static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
const NameSet & updated_columns,
const StorageMetadataPtr & metadata_snapshot,
const NameSet & materialized_projections,
@ -228,7 +228,7 @@ private:
void writeWithProjections(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeProjections & projections_to_build,
const std::vector<ProjectionDescriptionRawPtr> & projections_to_build,
BlockInputStreamPtr mutating_stream,
IMergedBlockOutputStream & out,
time_t time_of_mutation,
@ -243,7 +243,7 @@ private:
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeIndices & skip_indices,
const MergeTreeProjections & projections_to_build,
const std::vector<ProjectionDescriptionRawPtr> & projections_to_build,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
const CompressionCodecPtr & compression_codec,
@ -259,7 +259,7 @@ private:
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const std::set<MergeTreeProjectionPtr> & projections_to_recalc,
const std::set<ProjectionDescriptionRawPtr> & projections_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
BlockInputStreamPtr mutating_stream,

View File

@ -24,6 +24,7 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Core/UUID.h>
#include <DataTypes/DataTypeDate.h>
@ -169,10 +170,19 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
Pipe projection_pipe;
Pipe ordinary_pipe;
if (query_info.projection->merge_tree_projection_select_result_ptr)
auto projection_plan = std::make_unique<QueryPlan>();
if (query_info.projection->desc->is_minmax_count_projection)
{
Pipe pipe(std::make_shared<SourceFromSingleChunk>(
query_info.minmax_count_projection_block,
Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows())));
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
projection_plan->addStep(std::move(read_from_pipe));
}
else if (query_info.projection->merge_tree_projection_select_result_ptr)
{
LOG_DEBUG(log, "projection required columns: {}", fmt::join(query_info.projection->required_columns, ", "));
auto plan = readFromParts(
projection_plan = readFromParts(
{},
query_info.projection->required_columns,
metadata_snapshot,
@ -183,35 +193,32 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
num_streams,
max_block_numbers_to_read,
query_info.projection->merge_tree_projection_select_result_ptr);
}
if (plan->isInitialized())
if (projection_plan->isInitialized())
{
if (query_info.projection->before_where)
{
// If `before_where` is not empty, transform input blocks by adding needed columns
// originated from key columns. We already project the block at the end, using
// projection_block, so we can just add more columns here without worrying
// NOTE: prewhere is executed inside readFromParts
if (query_info.projection->before_where)
{
auto where_step = std::make_unique<FilterStep>(
plan->getCurrentDataStream(),
query_info.projection->before_where,
query_info.projection->where_column_name,
query_info.projection->remove_where_filter);
auto where_step = std::make_unique<FilterStep>(
projection_plan->getCurrentDataStream(),
query_info.projection->before_where,
query_info.projection->where_column_name,
query_info.projection->remove_where_filter);
where_step->setStepDescription("WHERE");
plan->addStep(std::move(where_step));
}
if (query_info.projection->before_aggregation)
{
auto expression_before_aggregation
= std::make_unique<ExpressionStep>(plan->getCurrentDataStream(), query_info.projection->before_aggregation);
expression_before_aggregation->setStepDescription("Before GROUP BY");
plan->addStep(std::move(expression_before_aggregation));
}
projection_pipe = plan->convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
where_step->setStepDescription("WHERE");
projection_plan->addStep(std::move(where_step));
}
if (query_info.projection->before_aggregation)
{
auto expression_before_aggregation
= std::make_unique<ExpressionStep>(projection_plan->getCurrentDataStream(), query_info.projection->before_aggregation);
expression_before_aggregation->setStepDescription("Before GROUP BY");
projection_plan->addStep(std::move(expression_before_aggregation));
}
projection_pipe = projection_plan->convertToPipe(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
}
if (query_info.projection->merge_tree_normal_select_result_ptr)

View File

@ -1,78 +0,0 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/MergeTree/MergeTreeProjections.h>
#include <numeric>
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
void MergeTreeProjectionFactory::registerCreator(ProjectionDescription::Type projection_type, Creator creator)
{
if (!creators.emplace(projection_type, std::move(creator)).second)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"MergeTreeProjectionFactory: the Projection creator name '{}' is not unique",
ProjectionDescription::typeToString(projection_type));
}
MergeTreeProjectionPtr MergeTreeProjectionFactory::get(const ProjectionDescription & projection) const
{
auto it = creators.find(projection.type);
if (it == creators.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection type {} is not registered",
ProjectionDescription::typeToString(projection.type));
return it->second(projection);
}
MergeTreeProjections MergeTreeProjectionFactory::getMany(const std::vector<ProjectionDescription> & projections) const
{
MergeTreeProjections result;
for (const auto & projection : projections)
result.emplace_back(get(projection));
return result;
}
void MergeTreeProjectionFactory::validate(const ProjectionDescription & projection) const
{
if (startsWith(projection.name, "tmp_"))
throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY);
get(projection);
}
MergeTreeProjectionPtr normalProjectionCreator(const ProjectionDescription & projection)
{
return std::make_shared<MergeTreeProjectionNormal>(projection);
}
MergeTreeProjectionPtr aggregateProjectionCreator(const ProjectionDescription & projection)
{
return std::make_shared<MergeTreeProjectionAggregate>(projection);
}
MergeTreeProjectionFactory::MergeTreeProjectionFactory()
{
registerCreator(ProjectionDescription::Type::Normal, normalProjectionCreator);
registerCreator(ProjectionDescription::Type::Aggregate, aggregateProjectionCreator);
}
MergeTreeProjectionFactory & MergeTreeProjectionFactory::instance()
{
static MergeTreeProjectionFactory instance;
return instance;
}
}

View File

@ -1,88 +0,0 @@
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include <Core/Block.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTProjectionDeclaration.h>
#include <Storages/MergeTree/MarkRange.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/ProjectionsDescription.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
/// Condition on the projection.
class IMergeTreeProjectionCondition
{
public:
virtual ~IMergeTreeProjectionCondition() = default;
/// Checks if this projection is useful for query.
virtual bool canHandleQuery() const = 0;
};
using MergeTreeProjectionConditionPtr = std::shared_ptr<IMergeTreeProjectionCondition>;
struct IMergeTreeProjection
{
IMergeTreeProjection(const ProjectionDescription & projection_) : projection(projection_) { }
virtual ~IMergeTreeProjection() = default;
/// gets directory name
String getDirectoryName() const { return projection.name + ".proj"; }
const String & getName() const { return projection.name; }
Names getColumnsRequiredForProjectionCalc() const { return projection.required_columns; }
const ProjectionDescription & projection;
};
using MergeTreeProjectionPtr = std::shared_ptr<const IMergeTreeProjection>;
using MergeTreeProjections = std::vector<MergeTreeProjectionPtr>;
class MergeTreeProjectionNormal : public IMergeTreeProjection
{
public:
explicit MergeTreeProjectionNormal(const ProjectionDescription & projection_) : IMergeTreeProjection(projection_) { }
~MergeTreeProjectionNormal() override = default;
};
class MergeTreeProjectionAggregate : public IMergeTreeProjection
{
public:
explicit MergeTreeProjectionAggregate(const ProjectionDescription & projection_) : IMergeTreeProjection(projection_) { }
~MergeTreeProjectionAggregate() override = default;
};
class MergeTreeProjectionFactory : private boost::noncopyable
{
public:
static MergeTreeProjectionFactory & instance();
using Creator = std::function<MergeTreeProjectionPtr(const ProjectionDescription & projection)>;
void validate(const ProjectionDescription & projection) const;
MergeTreeProjectionPtr get(const ProjectionDescription & projection) const;
MergeTreeProjections getMany(const std::vector<ProjectionDescription> & projections) const;
void registerCreator(ProjectionDescription::Type projection_type, Creator creator);
protected:
MergeTreeProjectionFactory();
private:
using Creators = std::unordered_map<ProjectionDescription::Type, Creator>;
Creators creators;
};
}

View File

@ -33,7 +33,7 @@ public:
void writeSuffix() override;
/// Finilize writing part and fill inner structures
/// Finalize writing part and fill inner structures
/// If part is new and contains projections, they should be added before invoking this method.
void writeSuffixAndFinalizePart(
MergeTreeData::MutableDataPartPtr & new_part,

View File

@ -651,6 +651,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// single default partition with name "all".
metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, args.getContext());
auto minmax_columns = metadata.getColumnsRequiredForPartitionKey();
metadata.minmax_count_projection.emplace(
ProjectionDescription::getMinMaxCountProjection(args.columns, minmax_columns, args.getContext()));
/// PRIMARY KEY without ORDER BY is allowed and considered as ORDER BY.
if (!args.storage_def->order_by && args.storage_def->primary_key)
args.storage_def->set(args.storage_def->order_by, args.storage_def->primary_key->clone());
@ -732,6 +736,9 @@ static StoragePtr create(const StorageFactory::Arguments & args)
metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_ast, metadata.columns, args.getContext());
auto minmax_columns = metadata.getColumnsRequiredForPartitionKey();
metadata.minmax_count_projection.emplace(
ProjectionDescription::getMinMaxCountProjection(args.columns, minmax_columns, args.getContext()));
++arg_num;

View File

@ -74,6 +74,7 @@ ProjectionDescription ProjectionDescription::clone() const
other.sample_block_for_keys = sample_block_for_keys;
other.metadata = metadata;
other.key_size = key_size;
other.is_minmax_count_projection = is_minmax_count_projection;
return other;
}
@ -103,6 +104,9 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
if (projection_definition->name.empty())
throw Exception("Projection must have name in definition.", ErrorCodes::INCORRECT_QUERY);
if (startsWith(projection_definition->name, "tmp_"))
throw Exception("Projection's name cannot start with 'tmp_'", ErrorCodes::INCORRECT_QUERY);
if (!projection_definition->query)
throw Exception("QUERY is required for projection", ErrorCodes::INCORRECT_QUERY);
@ -189,6 +193,54 @@ ProjectionDescription::getProjectionFromAST(const ASTPtr & definition_ast, const
return result;
}
ProjectionDescription
ProjectionDescription::getMinMaxCountProjection(const ColumnsDescription & columns, const Names & minmax_columns, ContextPtr query_context)
{
auto select_query = std::make_shared<ASTProjectionSelectQuery>();
ASTPtr select_expression_list = std::make_shared<ASTExpressionList>();
for (const auto & column : minmax_columns)
{
select_expression_list->children.push_back(makeASTFunction("min", std::make_shared<ASTIdentifier>(column)));
select_expression_list->children.push_back(makeASTFunction("max", std::make_shared<ASTIdentifier>(column)));
}
select_expression_list->children.push_back(makeASTFunction("count"));
select_query->setExpression(ASTProjectionSelectQuery::Expression::SELECT, std::move(select_expression_list));
ProjectionDescription result;
result.definition_ast = select_query;
result.name = MINMAX_COUNT_PROJECTION_NAME;
result.query_ast = select_query->cloneToASTSelect();
auto external_storage_holder = std::make_shared<TemporaryTableHolder>(query_context, columns, ConstraintsDescription{});
StoragePtr storage = external_storage_holder->getTable();
InterpreterSelectQuery select(
result.query_ast, query_context, storage, {}, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.modify().ignoreAlias());
result.required_columns = select.getRequiredColumns();
result.sample_block = select.getSampleBlock();
for (size_t i = 0; i < result.sample_block.columns(); ++i)
{
const auto & column_with_type_name = result.sample_block.getByPosition(i);
if (column_with_type_name.column && isColumnConst(*column_with_type_name.column))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Projections cannot contain constant columns: {}", column_with_type_name.name);
result.column_names.emplace_back(column_with_type_name.name);
result.data_types.emplace_back(column_with_type_name.type);
}
result.type = ProjectionDescription::Type::Aggregate;
StorageInMemoryMetadata metadata;
metadata.setColumns(ColumnsDescription(result.sample_block.getNamesAndTypesList()));
metadata.partition_key = KeyDescription::getSortingKeyFromAST({}, metadata.columns, query_context, {});
metadata.sorting_key = KeyDescription::getSortingKeyFromAST({}, metadata.columns, query_context, {});
metadata.primary_key = KeyDescription::getKeyFromAST({}, metadata.columns, query_context);
metadata.primary_key.definition_ast = nullptr;
result.metadata = std::make_shared<StorageInMemoryMetadata>(metadata);
result.is_minmax_count_projection = true;
return result;
}
void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context)
{
*this = getProjectionFromAST(definition_ast, new_columns, query_context);

View File

@ -28,6 +28,8 @@ struct ProjectionDescription
Aggregate,
};
static constexpr const char * MINMAX_COUNT_PROJECTION_NAME = "_minmax_count_projection";
static const char * typeToString(Type type);
/// Definition AST of projection
@ -62,10 +64,15 @@ struct ProjectionDescription
size_t key_size = 0;
bool is_minmax_count_projection = false;
/// Parse projection from definition AST
static ProjectionDescription
getProjectionFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr query_context);
static ProjectionDescription
getMinMaxCountProjection(const ColumnsDescription & columns, const Names & minmax_columns, ContextPtr query_context);
ProjectionDescription() = default;
/// We need custom copy constructors because we don't want
@ -87,8 +94,12 @@ struct ProjectionDescription
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
Block calculate(const Block & block, ContextPtr context) const;
String getDirectoryName() const { return name + ".proj"; }
};
using ProjectionDescriptionRawPtr = const ProjectionDescription *;
/// All projections in storage
struct ProjectionsDescription
{

View File

@ -106,7 +106,7 @@ using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
// The projection selected to execute current query
struct ProjectionCandidate
{
const ProjectionDescription * desc{};
ProjectionDescriptionRawPtr desc{};
PrewhereInfoPtr prewhere_info;
ActionsDAGPtr before_where;
String where_column_name;
@ -164,6 +164,7 @@ struct SelectQueryInfo
bool ignore_projections = false;
bool is_projection_query = false;
bool merge_tree_empty_result = false;
Block minmax_count_projection_block;
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
};

View File

@ -29,6 +29,8 @@ StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata &
, secondary_indices(other.secondary_indices)
, constraints(other.constraints)
, projections(other.projections.clone())
, minmax_count_projection(
other.minmax_count_projection ? std::optional<ProjectionDescription>(other.minmax_count_projection->clone()) : std::nullopt)
, partition_key(other.partition_key)
, primary_key(other.primary_key)
, sorting_key(other.sorting_key)
@ -50,6 +52,10 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo
secondary_indices = other.secondary_indices;
constraints = other.constraints;
projections = other.projections.clone();
if (other.minmax_count_projection)
minmax_count_projection = other.minmax_count_projection->clone();
else
minmax_count_projection = std::nullopt;
partition_key = other.partition_key;
primary_key = other.primary_key;
sorting_key = other.sorting_key;

View File

@ -28,6 +28,8 @@ struct StorageInMemoryMetadata
ConstraintsDescription constraints;
/// Table projections. Currently supported for MergeTree only.
ProjectionsDescription projections;
/// Table minmax_count projection. Currently supported for MergeTree only.
std::optional<ProjectionDescription> minmax_count_projection;
/// PARTITION BY expression. Currently supported for MergeTree only.
KeyDescription partition_key;
/// PRIMARY KEY expression. If absent, than equal to order_by_ast.
@ -107,6 +109,7 @@ struct StorageInMemoryMetadata
const ConstraintsDescription & getConstraints() const;
const ProjectionsDescription & getProjections() const;
/// Has at least one projection
bool hasProjections() const;

View File

@ -76,7 +76,6 @@ SRCS(
MergeTree/MergeTreePartInfo.cpp
MergeTree/MergeTreePartition.cpp
MergeTree/MergeTreePartsMover.cpp
MergeTree/MergeTreeProjections.cpp
MergeTree/MergeTreeRangeReader.cpp
MergeTree/MergeTreeReadPool.cpp
MergeTree/MergeTreeReaderCompact.cpp

View File

@ -0,0 +1,4 @@
0 9999 10000
0 9998 5000
1 9999 5000
0 9998 5000

View File

@ -0,0 +1,14 @@
drop table if exists d;
create table d (i int, j int) engine MergeTree partition by i % 2 order by tuple() settings index_granularity = 1;
insert into d select number, number from numbers(10000);
set max_rows_to_read = 2, allow_experimental_projection_optimization = 1;
select min(i), max(i), count() from d;
select min(i), max(i), count() from d group by _partition_id order by _partition_id;
select min(i), max(i), count() from d where _partition_value.1 = 0 group by _partition_id order by _partition_id;
select min(i), max(i), count() from d where _partition_value.1 = 10 group by _partition_id order by _partition_id;
drop table d;