mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge pull request #28178 from amosbird/projection-fix14
Projection bug fixes and refactoring.
This commit is contained in:
commit
34f4bd3e72
@ -271,6 +271,9 @@ try
|
|||||||
/// Load global settings from default_profile and system_profile.
|
/// Load global settings from default_profile and system_profile.
|
||||||
global_context->setDefaultProfiles(config());
|
global_context->setDefaultProfiles(config());
|
||||||
|
|
||||||
|
/// We load temporary database first, because projections need it.
|
||||||
|
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
|
||||||
|
|
||||||
/** Init dummy default DB
|
/** Init dummy default DB
|
||||||
* NOTE: We force using isolated default database to avoid conflicts with default database from server environment
|
* NOTE: We force using isolated default database to avoid conflicts with default database from server environment
|
||||||
* Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory;
|
* Otherwise, metadata of temporary File(format, EXPLICIT_PATH) tables will pollute metadata/ directory;
|
||||||
|
@ -1116,15 +1116,15 @@ if (ThreadFuzzer::instance().isEffective())
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
auto & database_catalog = DatabaseCatalog::instance();
|
||||||
|
/// We load temporary database first, because projections need it.
|
||||||
|
database_catalog.initializeAndLoadTemporaryDatabase();
|
||||||
loadMetadataSystem(global_context);
|
loadMetadataSystem(global_context);
|
||||||
/// After attaching system databases we can initialize system log.
|
/// After attaching system databases we can initialize system log.
|
||||||
global_context->initializeSystemLogs();
|
global_context->initializeSystemLogs();
|
||||||
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
|
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
|
||||||
auto & database_catalog = DatabaseCatalog::instance();
|
|
||||||
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
|
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
|
||||||
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
||||||
/// We load temporary database first, because projections need it.
|
|
||||||
database_catalog.initializeAndLoadTemporaryDatabase();
|
|
||||||
/// Then, load remaining databases
|
/// Then, load remaining databases
|
||||||
loadMetadata(global_context, default_database);
|
loadMetadata(global_context, default_database);
|
||||||
database_catalog.loadDatabases();
|
database_catalog.loadDatabases();
|
||||||
|
@ -550,7 +550,7 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
|
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const
|
||||||
{
|
{
|
||||||
TableProperties properties;
|
TableProperties properties;
|
||||||
TableLockHolder as_storage_lock;
|
TableLockHolder as_storage_lock;
|
||||||
@ -589,10 +589,13 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
|||||||
auto as_storage_metadata = as_storage->getInMemoryMetadataPtr();
|
auto as_storage_metadata = as_storage->getInMemoryMetadataPtr();
|
||||||
properties.columns = as_storage_metadata->getColumns();
|
properties.columns = as_storage_metadata->getColumns();
|
||||||
|
|
||||||
/// Secondary indices make sense only for MergeTree family of storage engines.
|
/// Secondary indices and projections make sense only for MergeTree family of storage engines.
|
||||||
/// We should not copy them for other storages.
|
/// We should not copy them for other storages.
|
||||||
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
|
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
|
||||||
|
{
|
||||||
properties.indices = as_storage_metadata->getSecondaryIndices();
|
properties.indices = as_storage_metadata->getSecondaryIndices();
|
||||||
|
properties.projections = as_storage_metadata->getProjections().clone();
|
||||||
|
}
|
||||||
|
|
||||||
properties.constraints = as_storage_metadata->getConstraints();
|
properties.constraints = as_storage_metadata->getConstraints();
|
||||||
}
|
}
|
||||||
@ -910,7 +913,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
|
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
|
||||||
TableProperties properties = setProperties(create);
|
TableProperties properties = getTablePropertiesAndNormalizeCreateQuery(create);
|
||||||
|
|
||||||
DatabasePtr database;
|
DatabasePtr database;
|
||||||
bool need_add_to_database = !create.temporary;
|
bool need_add_to_database = !create.temporary;
|
||||||
|
@ -74,7 +74,7 @@ private:
|
|||||||
BlockIO createTable(ASTCreateQuery & create);
|
BlockIO createTable(ASTCreateQuery & create);
|
||||||
|
|
||||||
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
|
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
|
||||||
TableProperties setProperties(ASTCreateQuery & create) const;
|
TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const;
|
||||||
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
|
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
|
||||||
void setEngine(ASTCreateQuery & create) const;
|
void setEngine(ASTCreateQuery & create) const;
|
||||||
AccessRightsElements getRequiredAccess() const;
|
AccessRightsElements getRequiredAccess() const;
|
||||||
|
@ -287,7 +287,7 @@ static void injectVirtualColumnsImpl(
|
|||||||
{
|
{
|
||||||
ColumnPtr column;
|
ColumnPtr column;
|
||||||
if (rows)
|
if (rows)
|
||||||
column = DataTypeUUID().createColumnConst(rows, task->data_part->uuid)->convertToFullColumnIfConst();
|
column = DataTypeUUID().createColumnConst(rows, part->uuid)->convertToFullColumnIfConst();
|
||||||
else
|
else
|
||||||
column = DataTypeUUID().createColumn();
|
column = DataTypeUUID().createColumn();
|
||||||
|
|
||||||
@ -306,7 +306,7 @@ static void injectVirtualColumnsImpl(
|
|||||||
else if (virtual_column_name == "_partition_value")
|
else if (virtual_column_name == "_partition_value")
|
||||||
{
|
{
|
||||||
if (rows)
|
if (rows)
|
||||||
inserter.insertPartitionValueColumn(rows, task->data_part->partition.value, partition_value_type, virtual_column_name);
|
inserter.insertPartitionValueColumn(rows, part->partition.value, partition_value_type, virtual_column_name);
|
||||||
else
|
else
|
||||||
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
|
inserter.insertPartitionValueColumn(rows, {}, partition_value_type, virtual_column_name);
|
||||||
}
|
}
|
||||||
|
@ -757,16 +757,20 @@ DataTypePtr MergeTreeData::getPartitionValueType() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const
|
Block MergeTreeData::getSampleBlockWithVirtualColumns() const
|
||||||
{
|
{
|
||||||
DataTypePtr partition_value_type = getPartitionValueType();
|
DataTypePtr partition_value_type = getPartitionValueType();
|
||||||
bool has_partition_value = typeid_cast<const DataTypeTuple *>(partition_value_type.get());
|
return {
|
||||||
Block block{
|
|
||||||
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
|
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_part"),
|
||||||
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
|
ColumnWithTypeAndName(ColumnString::create(), std::make_shared<DataTypeString>(), "_partition_id"),
|
||||||
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid"),
|
ColumnWithTypeAndName(ColumnUUID::create(), std::make_shared<DataTypeUUID>(), "_part_uuid"),
|
||||||
ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")};
|
ColumnWithTypeAndName(partition_value_type->createColumn(), partition_value_type, "_partition_value")};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const
|
||||||
|
{
|
||||||
|
auto block = getSampleBlockWithVirtualColumns();
|
||||||
MutableColumns columns = block.mutateColumns();
|
MutableColumns columns = block.mutateColumns();
|
||||||
|
|
||||||
auto & part_column = columns[0];
|
auto & part_column = columns[0];
|
||||||
@ -774,6 +778,7 @@ Block MergeTreeData::getBlockWithVirtualPartColumns(const MergeTreeData::DataPar
|
|||||||
auto & part_uuid_column = columns[2];
|
auto & part_uuid_column = columns[2];
|
||||||
auto & partition_value_column = columns[3];
|
auto & partition_value_column = columns[3];
|
||||||
|
|
||||||
|
bool has_partition_value = typeid_cast<const ColumnTuple *>(partition_value_column.get());
|
||||||
for (const auto & part_or_projection : parts)
|
for (const auto & part_or_projection : parts)
|
||||||
{
|
{
|
||||||
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
|
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
|
||||||
@ -3465,7 +3470,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector(
|
|||||||
{
|
{
|
||||||
for (const auto & part : range)
|
for (const auto & part : range)
|
||||||
{
|
{
|
||||||
for (const auto & [p_name, projection_part] : part->getProjectionParts())
|
for (const auto & [_, projection_part] : part->getProjectionParts())
|
||||||
res.push_back(projection_part);
|
res.push_back(projection_part);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4151,6 +4156,10 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
if (auto * select = query_ptr->as<ASTSelectQuery>(); select && select->final())
|
if (auto * select = query_ptr->as<ASTSelectQuery>(); select && select->final())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
// Currently projections don't support sampling yet.
|
||||||
|
if (settings.parallel_replicas_count > 1)
|
||||||
|
return false;
|
||||||
|
|
||||||
InterpreterSelectQuery select(
|
InterpreterSelectQuery select(
|
||||||
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
|
query_ptr, query_context, SelectQueryOptions{QueryProcessingStage::WithMergeableState}.ignoreProjections().ignoreAlias());
|
||||||
const auto & analysis_result = select.getAnalysisResult();
|
const auto & analysis_result = select.getAnalysisResult();
|
||||||
@ -4194,13 +4203,13 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
candidate.remove_where_filter = analysis_result.remove_where_filter;
|
candidate.remove_where_filter = analysis_result.remove_where_filter;
|
||||||
candidate.before_where = analysis_result.before_where->clone();
|
candidate.before_where = analysis_result.before_where->clone();
|
||||||
|
|
||||||
required_columns = candidate.before_where->foldActionsByProjection(
|
auto new_required_columns = candidate.before_where->foldActionsByProjection(
|
||||||
required_columns,
|
required_columns,
|
||||||
projection.sample_block_for_keys,
|
projection.sample_block_for_keys,
|
||||||
candidate.where_column_name);
|
candidate.where_column_name);
|
||||||
|
if (new_required_columns.empty() && !required_columns.empty())
|
||||||
if (required_columns.empty())
|
|
||||||
return false;
|
return false;
|
||||||
|
required_columns = std::move(new_required_columns);
|
||||||
candidate.before_where->addAggregatesViaProjection(aggregates);
|
candidate.before_where->addAggregatesViaProjection(aggregates);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4214,33 +4223,35 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
for (const auto & column : prewhere_actions->getResultColumns())
|
for (const auto & column : prewhere_actions->getResultColumns())
|
||||||
required_columns.erase(column.name);
|
required_columns.erase(column.name);
|
||||||
|
|
||||||
// Prewhere_action should not add missing keys.
|
{
|
||||||
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
|
// Prewhere_action should not add missing keys.
|
||||||
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
|
auto new_prewhere_required_columns = prewhere_actions->foldActionsByProjection(
|
||||||
|
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->prewhere_column_name, false);
|
||||||
if (prewhere_required_columns.empty())
|
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
|
||||||
return false;
|
return false;
|
||||||
candidate.prewhere_info->prewhere_actions = prewhere_actions;
|
prewhere_required_columns = std::move(new_prewhere_required_columns);
|
||||||
|
candidate.prewhere_info->prewhere_actions = prewhere_actions;
|
||||||
|
}
|
||||||
|
|
||||||
if (candidate.prewhere_info->row_level_filter)
|
if (candidate.prewhere_info->row_level_filter)
|
||||||
{
|
{
|
||||||
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
|
auto row_level_filter_actions = candidate.prewhere_info->row_level_filter->clone();
|
||||||
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
|
auto new_prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
|
||||||
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
|
prewhere_required_columns, projection.sample_block_for_keys, candidate.prewhere_info->row_level_column_name, false);
|
||||||
|
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
|
||||||
if (prewhere_required_columns.empty())
|
|
||||||
return false;
|
return false;
|
||||||
|
prewhere_required_columns = std::move(new_prewhere_required_columns);
|
||||||
candidate.prewhere_info->row_level_filter = row_level_filter_actions;
|
candidate.prewhere_info->row_level_filter = row_level_filter_actions;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (candidate.prewhere_info->alias_actions)
|
if (candidate.prewhere_info->alias_actions)
|
||||||
{
|
{
|
||||||
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
|
auto alias_actions = candidate.prewhere_info->alias_actions->clone();
|
||||||
prewhere_required_columns
|
auto new_prewhere_required_columns
|
||||||
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
|
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys, {}, false);
|
||||||
|
if (new_prewhere_required_columns.empty() && !prewhere_required_columns.empty())
|
||||||
if (prewhere_required_columns.empty())
|
|
||||||
return false;
|
return false;
|
||||||
|
prewhere_required_columns = std::move(new_prewhere_required_columns);
|
||||||
candidate.prewhere_info->alias_actions = alias_actions;
|
candidate.prewhere_info->alias_actions = alias_actions;
|
||||||
}
|
}
|
||||||
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
|
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
|
||||||
@ -4259,11 +4270,20 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
return match;
|
return match;
|
||||||
};
|
};
|
||||||
|
|
||||||
for (const auto & projection : metadata_snapshot->projections)
|
auto virtual_block = getSampleBlockWithVirtualColumns();
|
||||||
|
auto add_projection_candidate = [&](const ProjectionDescription & projection)
|
||||||
{
|
{
|
||||||
ProjectionCandidate candidate{};
|
ProjectionCandidate candidate{};
|
||||||
candidate.desc = &projection;
|
candidate.desc = &projection;
|
||||||
|
|
||||||
|
auto sample_block = projection.sample_block;
|
||||||
|
auto sample_block_for_keys = projection.sample_block_for_keys;
|
||||||
|
for (const auto & column : virtual_block)
|
||||||
|
{
|
||||||
|
sample_block.insertUnique(column);
|
||||||
|
sample_block_for_keys.insertUnique(column);
|
||||||
|
}
|
||||||
|
|
||||||
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
|
if (projection.type == ProjectionDescription::Type::Aggregate && analysis_result.need_aggregate && can_use_aggregate_projection)
|
||||||
{
|
{
|
||||||
bool match = true;
|
bool match = true;
|
||||||
@ -4271,7 +4291,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
// Let's first check if all aggregates are provided by current projection
|
// Let's first check if all aggregates are provided by current projection
|
||||||
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
|
for (const auto & aggregate : select.getQueryAnalyzer()->aggregates())
|
||||||
{
|
{
|
||||||
const auto * column = projection.sample_block.findByName(aggregate.column_name);
|
const auto * column = sample_block.findByName(aggregate.column_name);
|
||||||
if (column)
|
if (column)
|
||||||
{
|
{
|
||||||
aggregates.insert(*column);
|
aggregates.insert(*column);
|
||||||
@ -4284,25 +4304,25 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!match)
|
if (!match)
|
||||||
continue;
|
return;
|
||||||
|
|
||||||
// Check if all aggregation keys can be either provided by some action, or by current
|
// Check if all aggregation keys can be either provided by some action, or by current
|
||||||
// projection directly. Reshape the `before_aggregation` action DAG so that it only
|
// projection directly. Reshape the `before_aggregation` action DAG so that it only
|
||||||
// needs to provide aggregation keys, and certain children DAG might be substituted by
|
// needs to provide aggregation keys, and the DAG of certain child might be substituted
|
||||||
// some keys in projection.
|
// by some keys in projection.
|
||||||
candidate.before_aggregation = analysis_result.before_aggregation->clone();
|
candidate.before_aggregation = analysis_result.before_aggregation->clone();
|
||||||
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, projection.sample_block_for_keys);
|
auto required_columns = candidate.before_aggregation->foldActionsByProjection(keys, sample_block_for_keys);
|
||||||
|
|
||||||
// TODO Let's find out the exact required_columns for keys.
|
// TODO Let's find out the exact required_columns for keys.
|
||||||
if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty()))
|
if (required_columns.empty() && (!keys.empty() && !candidate.before_aggregation->getRequiredColumns().empty()))
|
||||||
continue;
|
return;
|
||||||
|
|
||||||
if (analysis_result.optimize_aggregation_in_order)
|
if (analysis_result.optimize_aggregation_in_order)
|
||||||
{
|
{
|
||||||
for (const auto & key : keys)
|
for (const auto & key : keys)
|
||||||
{
|
{
|
||||||
auto actions_dag = analysis_result.before_aggregation->clone();
|
auto actions_dag = analysis_result.before_aggregation->clone();
|
||||||
actions_dag->foldActionsByProjection({key}, projection.sample_block_for_keys);
|
actions_dag->foldActionsByProjection({key}, sample_block_for_keys);
|
||||||
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
|
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4311,7 +4331,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
|
candidate.before_aggregation->reorderAggregationKeysForProjection(key_name_pos_map);
|
||||||
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
|
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
|
||||||
|
|
||||||
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates))
|
if (rewrite_before_where(candidate, projection, required_columns, sample_block_for_keys, aggregates))
|
||||||
{
|
{
|
||||||
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
||||||
for (const auto & aggregate : aggregates)
|
for (const auto & aggregate : aggregates)
|
||||||
@ -4328,13 +4348,16 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
for (const auto & column : actions->getRequiredColumns())
|
for (const auto & column : actions->getRequiredColumns())
|
||||||
required_columns.insert(column.name);
|
required_columns.insert(column.name);
|
||||||
|
|
||||||
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {}))
|
if (rewrite_before_where(candidate, projection, required_columns, sample_block, {}))
|
||||||
{
|
{
|
||||||
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
candidate.required_columns = {required_columns.begin(), required_columns.end()};
|
||||||
candidates.push_back(std::move(candidate));
|
candidates.push_back(std::move(candidate));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
for (const auto & projection : metadata_snapshot->projections)
|
||||||
|
add_projection_candidate(projection);
|
||||||
|
|
||||||
// Let's select the best projection to execute the query.
|
// Let's select the best projection to execute the query.
|
||||||
if (!candidates.empty())
|
if (!candidates.empty())
|
||||||
@ -4409,6 +4432,14 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
|
|||||||
|
|
||||||
if (!selected_candidate)
|
if (!selected_candidate)
|
||||||
return false;
|
return false;
|
||||||
|
else if (min_sum_marks == 0)
|
||||||
|
{
|
||||||
|
/// If selected_projection indicated an empty result set. Remember it in query_info but
|
||||||
|
/// don't use projection to run the query, because projection pipeline with empty result
|
||||||
|
/// set will not work correctly with empty_result_for_aggregation_by_empty_set.
|
||||||
|
query_info.merge_tree_empty_result = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
|
if (selected_candidate->desc->type == ProjectionDescription::Type::Aggregate)
|
||||||
{
|
{
|
||||||
|
@ -795,6 +795,9 @@ public:
|
|||||||
/// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
|
/// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
|
||||||
DataTypePtr getPartitionValueType() const;
|
DataTypePtr getPartitionValueType() const;
|
||||||
|
|
||||||
|
/// Construct a sample block of virtual columns.
|
||||||
|
Block getSampleBlockWithVirtualColumns() const;
|
||||||
|
|
||||||
/// Construct a block consisting only of possible virtual columns for part pruning.
|
/// Construct a block consisting only of possible virtual columns for part pruning.
|
||||||
/// If one_part is true, fill in at most one part.
|
/// If one_part is true, fill in at most one part.
|
||||||
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const;
|
Block getBlockWithVirtualPartColumns(const MergeTreeData::DataPartsVector & parts, bool one_part) const;
|
||||||
|
@ -89,6 +89,7 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
|
|||||||
future_part_type = std::min(future_part_type, part->getType());
|
future_part_type = std::min(future_part_type, part->getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// NOTE: We don't support merging into an in-memory part yet.
|
||||||
auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows);
|
auto chosen_type = parts_.front()->storage.choosePartTypeOnDisk(sum_bytes_uncompressed, sum_rows);
|
||||||
future_part_type = std::min(future_part_type, chosen_type);
|
future_part_type = std::min(future_part_type, chosen_type);
|
||||||
assign(std::move(parts_), future_part_type);
|
assign(std::move(parts_), future_part_type);
|
||||||
@ -2014,10 +2015,19 @@ void MergeTreeDataMergerMutator::writeWithProjections(
|
|||||||
std::map<String, MergeTreeData::MutableDataPartsVector> projection_parts;
|
std::map<String, MergeTreeData::MutableDataPartsVector> projection_parts;
|
||||||
Block block;
|
Block block;
|
||||||
std::vector<SquashingTransform> projection_squashes;
|
std::vector<SquashingTransform> projection_squashes;
|
||||||
|
const auto & settings = context->getSettingsRef();
|
||||||
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
|
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
|
||||||
{
|
{
|
||||||
projection_squashes.emplace_back(65536, 65536 * 256);
|
// If the parent part is an in-memory part, squash projection output into one block and
|
||||||
|
// build in-memory projection because we don't support merging into a new in-memory part.
|
||||||
|
// Otherwise we split the materialization into multiple stages similar to the process of
|
||||||
|
// INSERT SELECT query.
|
||||||
|
if (new_data_part->getType() == MergeTreeDataPartType::IN_MEMORY)
|
||||||
|
projection_squashes.emplace_back(0, 0);
|
||||||
|
else
|
||||||
|
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
|
while (checkOperationIsNotCanceled(merge_entry) && (block = mutating_stream->read()))
|
||||||
{
|
{
|
||||||
if (minmax_idx)
|
if (minmax_idx)
|
||||||
@ -2028,26 +2038,10 @@ void MergeTreeDataMergerMutator::writeWithProjections(
|
|||||||
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
|
for (size_t i = 0, size = projections_to_build.size(); i < size; ++i)
|
||||||
{
|
{
|
||||||
const auto & projection = projections_to_build[i]->projection;
|
const auto & projection = projections_to_build[i]->projection;
|
||||||
auto in = InterpreterSelectQuery(
|
auto projection_block = projection_squashes[i].add(projection.calculate(block, context));
|
||||||
projection.query_ast,
|
|
||||||
context,
|
|
||||||
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
|
|
||||||
SelectQueryOptions{
|
|
||||||
projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState})
|
|
||||||
.execute()
|
|
||||||
.getInputStream();
|
|
||||||
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), std::numeric_limits<UInt64>::max());
|
|
||||||
in->readPrefix();
|
|
||||||
auto & projection_squash = projection_squashes[i];
|
|
||||||
auto projection_block = projection_squash.add(in->read());
|
|
||||||
if (in->read())
|
|
||||||
throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
in->readSuffix();
|
|
||||||
if (projection_block)
|
if (projection_block)
|
||||||
{
|
projection_parts[projection.name].emplace_back(MergeTreeDataWriter::writeTempProjectionPart(
|
||||||
projection_parts[projection.name].emplace_back(
|
data, log, projection_block, projection, new_data_part.get(), ++block_num));
|
||||||
MergeTreeDataWriter::writeTempProjectionPart(data, log, projection_block, projection, new_data_part.get(), ++block_num));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
merge_entry->rows_written += block.rows();
|
merge_entry->rows_written += block.rows();
|
||||||
|
@ -94,6 +94,42 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
|
|||||||
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
|
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
|
||||||
out.writePrefix();
|
out.writePrefix();
|
||||||
out.write(block);
|
out.write(block);
|
||||||
|
const auto & projections = metadata_snapshot->getProjections();
|
||||||
|
for (const auto & [projection_name, projection] : projection_parts)
|
||||||
|
{
|
||||||
|
if (projections.has(projection_name))
|
||||||
|
{
|
||||||
|
String projection_destination_path = fs::path(destination_path) / projection_name / ".proj";
|
||||||
|
if (disk->exists(projection_destination_path))
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
|
||||||
|
"Could not flush projection part {}. Projection part in {} already exists",
|
||||||
|
projection_name,
|
||||||
|
fullPath(disk, projection_destination_path));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto projection_part = asInMemoryPart(projection);
|
||||||
|
auto projection_type = storage.choosePartTypeOnDisk(projection_part->block.bytes(), rows_count);
|
||||||
|
MergeTreePartInfo projection_info("all", 0, 0, 0);
|
||||||
|
auto projection_data_part
|
||||||
|
= storage.createPart(projection_name, projection_type, projection_info, volume, projection_name + ".proj", parent_part);
|
||||||
|
projection_data_part->is_temp = false; // clean up will be done on parent part
|
||||||
|
projection_data_part->setColumns(projection->getColumns());
|
||||||
|
|
||||||
|
disk->createDirectories(projection_destination_path);
|
||||||
|
const auto & desc = projections.get(name);
|
||||||
|
auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
|
||||||
|
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
|
||||||
|
MergedBlockOutputStream projection_out(
|
||||||
|
projection_data_part, desc.metadata, projection_part->columns, projection_indices, projection_compression_codec);
|
||||||
|
projection_out.writePrefix();
|
||||||
|
projection_out.write(projection_part->block);
|
||||||
|
projection_out.writeSuffixAndFinalizePart(projection_data_part);
|
||||||
|
new_data_part->addProjectionPart(projection_name, std::move(projection_data_part));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
out.writeSuffixAndFinalizePart(new_data_part);
|
out.writeSuffixAndFinalizePart(new_data_part);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,6 +132,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
|||||||
QueryProcessingStage::Enum processed_stage,
|
QueryProcessingStage::Enum processed_stage,
|
||||||
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
|
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
|
||||||
{
|
{
|
||||||
|
if (query_info.merge_tree_empty_result)
|
||||||
|
return std::make_unique<QueryPlan>();
|
||||||
|
|
||||||
const auto & settings = context->getSettingsRef();
|
const auto & settings = context->getSettingsRef();
|
||||||
if (!query_info.projection)
|
if (!query_info.projection)
|
||||||
{
|
{
|
||||||
@ -181,7 +184,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
|||||||
max_block_numbers_to_read,
|
max_block_numbers_to_read,
|
||||||
query_info.projection->merge_tree_projection_select_result_ptr);
|
query_info.projection->merge_tree_projection_select_result_ptr);
|
||||||
|
|
||||||
if (plan)
|
if (plan->isInitialized())
|
||||||
{
|
{
|
||||||
// If `before_where` is not empty, transform input blocks by adding needed columns
|
// If `before_where` is not empty, transform input blocks by adding needed columns
|
||||||
// originated from key columns. We already project the block at the end, using
|
// originated from key columns. We already project the block at the end, using
|
||||||
@ -237,7 +240,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
|||||||
ordinary_query_plan.addStep(std::move(where_step));
|
ordinary_query_plan.addStep(std::move(where_step));
|
||||||
}
|
}
|
||||||
|
|
||||||
ordinary_pipe = QueryPipeline::getPipe(interpreter.execute().pipeline);
|
ordinary_pipe = ordinary_query_plan.convertToPipe(
|
||||||
|
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
|
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
|
||||||
@ -351,12 +355,14 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
|||||||
pipes.emplace_back(std::move(projection_pipe));
|
pipes.emplace_back(std::move(projection_pipe));
|
||||||
pipes.emplace_back(std::move(ordinary_pipe));
|
pipes.emplace_back(std::move(ordinary_pipe));
|
||||||
auto pipe = Pipe::unitePipes(std::move(pipes));
|
auto pipe = Pipe::unitePipes(std::move(pipes));
|
||||||
pipe.resize(1);
|
auto plan = std::make_unique<QueryPlan>();
|
||||||
|
if (pipe.empty())
|
||||||
|
return plan;
|
||||||
|
|
||||||
|
pipe.resize(1);
|
||||||
auto step = std::make_unique<ReadFromStorageStep>(
|
auto step = std::make_unique<ReadFromStorageStep>(
|
||||||
std::move(pipe),
|
std::move(pipe),
|
||||||
fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name));
|
fmt::format("MergeTree(with {} projection {})", query_info.projection->desc->type, query_info.projection->desc->name));
|
||||||
auto plan = std::make_unique<QueryPlan>();
|
|
||||||
plan->addStep(std::move(step));
|
plan->addStep(std::move(step));
|
||||||
return plan;
|
return plan;
|
||||||
}
|
}
|
||||||
|
@ -386,31 +386,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
|
|||||||
sync_guard = disk->getDirectorySyncGuard(full_path);
|
sync_guard = disk->getDirectorySyncGuard(full_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metadata_snapshot->hasProjections())
|
|
||||||
{
|
|
||||||
for (const auto & projection : metadata_snapshot->getProjections())
|
|
||||||
{
|
|
||||||
auto in = InterpreterSelectQuery(
|
|
||||||
projection.query_ast,
|
|
||||||
context,
|
|
||||||
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
|
|
||||||
SelectQueryOptions{
|
|
||||||
projection.type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns : QueryProcessingStage::WithMergeableState})
|
|
||||||
.execute()
|
|
||||||
.getInputStream();
|
|
||||||
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), std::numeric_limits<UInt64>::max());
|
|
||||||
in->readPrefix();
|
|
||||||
auto projection_block = in->read();
|
|
||||||
if (in->read())
|
|
||||||
throw Exception("Projection cannot grow block rows", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
in->readSuffix();
|
|
||||||
if (projection_block.rows())
|
|
||||||
{
|
|
||||||
new_data_part->addProjectionPart(projection.name, writeProjectionPart(projection_block, projection, new_data_part.get()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metadata_snapshot->hasRowsTTL())
|
if (metadata_snapshot->hasRowsTTL())
|
||||||
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
|
||||||
|
|
||||||
@ -439,6 +414,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
|
|||||||
|
|
||||||
out.writePrefix();
|
out.writePrefix();
|
||||||
out.writeWithPermutation(block, perm_ptr);
|
out.writeWithPermutation(block, perm_ptr);
|
||||||
|
|
||||||
|
for (const auto & projection : metadata_snapshot->getProjections())
|
||||||
|
{
|
||||||
|
auto projection_block = projection.calculate(block, context);
|
||||||
|
if (projection_block.rows())
|
||||||
|
new_data_part->addProjectionPart(
|
||||||
|
projection.name, writeProjectionPart(data, log, projection_block, projection, new_data_part.get()));
|
||||||
|
}
|
||||||
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows());
|
||||||
@ -449,18 +432,28 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
|
|||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
|
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
|
||||||
MergeTreeData & data,
|
const String part_name,
|
||||||
|
MergeTreeDataPartType part_type,
|
||||||
|
const String & relative_path,
|
||||||
|
bool is_temp,
|
||||||
|
const IMergeTreeDataPart * parent_part,
|
||||||
|
const MergeTreeData & data,
|
||||||
Poco::Logger * log,
|
Poco::Logger * log,
|
||||||
Block block,
|
Block block,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot)
|
||||||
MergeTreeData::MutableDataPartPtr && new_data_part)
|
|
||||||
{
|
{
|
||||||
|
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
||||||
|
auto new_data_part = data.createPart(
|
||||||
|
part_name,
|
||||||
|
part_type,
|
||||||
|
new_part_info,
|
||||||
|
parent_part->volume,
|
||||||
|
relative_path,
|
||||||
|
parent_part);
|
||||||
|
new_data_part->is_temp = is_temp;
|
||||||
|
|
||||||
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
|
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
|
||||||
MergeTreePartition partition{};
|
|
||||||
IMergeTreeDataPart::MinMaxIndex minmax_idx{};
|
|
||||||
new_data_part->setColumns(columns);
|
new_data_part->setColumns(columns);
|
||||||
new_data_part->partition = std::move(partition);
|
|
||||||
new_data_part->minmax_idx = std::move(minmax_idx);
|
|
||||||
|
|
||||||
if (new_data_part->isStoredOnDisk())
|
if (new_data_part->isStoredOnDisk())
|
||||||
{
|
{
|
||||||
@ -523,27 +516,41 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPartImpl(
|
|||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterUncompressedBytes, block.bytes());
|
||||||
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
ProfileEvents::increment(ProfileEvents::MergeTreeDataProjectionWriterCompressedBytes, new_data_part->getBytesOnDisk());
|
||||||
|
|
||||||
return std::move(new_data_part);
|
return new_data_part;
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr
|
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeProjectionPart(
|
||||||
MergeTreeDataWriter::writeProjectionPart(Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part)
|
MergeTreeData & data, Poco::Logger * log, Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part)
|
||||||
{
|
{
|
||||||
/// Size of part would not be greater than block.bytes() + epsilon
|
|
||||||
size_t expected_size = block.bytes();
|
|
||||||
|
|
||||||
// just check if there is enough space on parent volume
|
|
||||||
data.reserveSpace(expected_size, parent_part->volume);
|
|
||||||
|
|
||||||
String part_name = projection.name;
|
String part_name = projection.name;
|
||||||
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
MergeTreeDataPartType part_type;
|
||||||
auto new_data_part = data.createPart(
|
if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY)
|
||||||
part_name, data.choosePartType(expected_size, block.rows()), new_part_info, parent_part->volume, part_name + ".proj", parent_part);
|
{
|
||||||
new_data_part->is_temp = false; // clean up will be done on parent part
|
part_type = MergeTreeDataPartType::IN_MEMORY;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// Size of part would not be greater than block.bytes() + epsilon
|
||||||
|
size_t expected_size = block.bytes();
|
||||||
|
// just check if there is enough space on parent volume
|
||||||
|
data.reserveSpace(expected_size, parent_part->volume);
|
||||||
|
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part));
|
return writeProjectionPartImpl(
|
||||||
|
part_name,
|
||||||
|
part_type,
|
||||||
|
part_name + ".proj" /* relative_path */,
|
||||||
|
false /* is_temp */,
|
||||||
|
parent_part,
|
||||||
|
data,
|
||||||
|
log,
|
||||||
|
block,
|
||||||
|
projection.metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This is used for projection materialization process which may contain multiple stages of
|
||||||
|
/// projection part merges.
|
||||||
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
|
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
|
||||||
MergeTreeData & data,
|
MergeTreeData & data,
|
||||||
Poco::Logger * log,
|
Poco::Logger * log,
|
||||||
@ -552,24 +559,50 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempProjectionPart(
|
|||||||
const IMergeTreeDataPart * parent_part,
|
const IMergeTreeDataPart * parent_part,
|
||||||
size_t block_num)
|
size_t block_num)
|
||||||
{
|
{
|
||||||
/// Size of part would not be greater than block.bytes() + epsilon
|
|
||||||
size_t expected_size = block.bytes();
|
|
||||||
|
|
||||||
// just check if there is enough space on parent volume
|
|
||||||
data.reserveSpace(expected_size, parent_part->volume);
|
|
||||||
|
|
||||||
String part_name = fmt::format("{}_{}", projection.name, block_num);
|
String part_name = fmt::format("{}_{}", projection.name, block_num);
|
||||||
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
MergeTreeDataPartType part_type;
|
||||||
auto new_data_part = data.createPart(
|
if (parent_part->getType() == MergeTreeDataPartType::IN_MEMORY)
|
||||||
|
{
|
||||||
|
part_type = MergeTreeDataPartType::IN_MEMORY;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/// Size of part would not be greater than block.bytes() + epsilon
|
||||||
|
size_t expected_size = block.bytes();
|
||||||
|
// just check if there is enough space on parent volume
|
||||||
|
data.reserveSpace(expected_size, parent_part->volume);
|
||||||
|
part_type = data.choosePartTypeOnDisk(expected_size, block.rows());
|
||||||
|
}
|
||||||
|
|
||||||
|
return writeProjectionPartImpl(
|
||||||
part_name,
|
part_name,
|
||||||
data.choosePartType(expected_size, block.rows()),
|
part_type,
|
||||||
new_part_info,
|
"tmp_insert_" + part_name + ".proj" /* relative_path */,
|
||||||
parent_part->volume,
|
true /* is_temp */,
|
||||||
"tmp_insert_" + part_name + ".proj",
|
parent_part,
|
||||||
parent_part);
|
data,
|
||||||
new_data_part->is_temp = true; // It's part for merge
|
log,
|
||||||
|
block,
|
||||||
|
projection.metadata);
|
||||||
|
}
|
||||||
|
|
||||||
return writeProjectionPartImpl(data, log, block, projection.metadata, std::move(new_data_part));
|
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeInMemoryProjectionPart(
|
||||||
|
const MergeTreeData & data,
|
||||||
|
Poco::Logger * log,
|
||||||
|
Block block,
|
||||||
|
const ProjectionDescription & projection,
|
||||||
|
const IMergeTreeDataPart * parent_part)
|
||||||
|
{
|
||||||
|
return writeProjectionPartImpl(
|
||||||
|
projection.name,
|
||||||
|
MergeTreeDataPartType::IN_MEMORY,
|
||||||
|
projection.name + ".proj" /* relative_path */,
|
||||||
|
false /* is_temp */,
|
||||||
|
parent_part,
|
||||||
|
data,
|
||||||
|
log,
|
||||||
|
block,
|
||||||
|
projection.metadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -49,9 +49,15 @@ public:
|
|||||||
MergeTreeData::MutableDataPartPtr
|
MergeTreeData::MutableDataPartPtr
|
||||||
writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, ContextPtr context);
|
||||||
|
|
||||||
MergeTreeData::MutableDataPartPtr writeProjectionPart(
|
/// For insertion.
|
||||||
Block block, const ProjectionDescription & projection, const IMergeTreeDataPart * parent_part);
|
static MergeTreeData::MutableDataPartPtr writeProjectionPart(
|
||||||
|
MergeTreeData & data,
|
||||||
|
Poco::Logger * log,
|
||||||
|
Block block,
|
||||||
|
const ProjectionDescription & projection,
|
||||||
|
const IMergeTreeDataPart * parent_part);
|
||||||
|
|
||||||
|
/// For mutation: MATERIALIZE PROJECTION.
|
||||||
static MergeTreeData::MutableDataPartPtr writeTempProjectionPart(
|
static MergeTreeData::MutableDataPartPtr writeTempProjectionPart(
|
||||||
MergeTreeData & data,
|
MergeTreeData & data,
|
||||||
Poco::Logger * log,
|
Poco::Logger * log,
|
||||||
@ -60,15 +66,27 @@ public:
|
|||||||
const IMergeTreeDataPart * parent_part,
|
const IMergeTreeDataPart * parent_part,
|
||||||
size_t block_num);
|
size_t block_num);
|
||||||
|
|
||||||
|
/// For WriteAheadLog AddPart.
|
||||||
|
static MergeTreeData::MutableDataPartPtr writeInMemoryProjectionPart(
|
||||||
|
const MergeTreeData & data,
|
||||||
|
Poco::Logger * log,
|
||||||
|
Block block,
|
||||||
|
const ProjectionDescription & projection,
|
||||||
|
const IMergeTreeDataPart * parent_part);
|
||||||
|
|
||||||
Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
|
Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl(
|
static MergeTreeData::MutableDataPartPtr writeProjectionPartImpl(
|
||||||
MergeTreeData & data,
|
const String part_name,
|
||||||
|
MergeTreeDataPartType part_type,
|
||||||
|
const String & relative_path,
|
||||||
|
bool is_temp,
|
||||||
|
const IMergeTreeDataPart * parent_part,
|
||||||
|
const MergeTreeData & data,
|
||||||
Poco::Logger * log,
|
Poco::Logger * log,
|
||||||
Block block,
|
Block block,
|
||||||
const StorageMetadataPtr & metadata_snapshot,
|
const StorageMetadataPtr & metadata_snapshot);
|
||||||
MergeTreeData::MutableDataPartPtr && new_data_part);
|
|
||||||
|
|
||||||
MergeTreeData & data;
|
MergeTreeData & data;
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
|
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
|
||||||
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
||||||
#include <Storages/MergeTree/MergeTreeData.h>
|
#include <Storages/MergeTree/MergeTreeData.h>
|
||||||
|
#include <Storages/MergeTree/MergeTreeDataWriter.h>
|
||||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||||
#include <IO/MemoryReadWriteBuffer.h>
|
#include <IO/MemoryReadWriteBuffer.h>
|
||||||
#include <IO/ReadHelpers.h>
|
#include <IO/ReadHelpers.h>
|
||||||
@ -31,6 +32,7 @@ MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
|
|||||||
, name(name_)
|
, name(name_)
|
||||||
, path(storage.getRelativeDataPath() + name_)
|
, path(storage.getRelativeDataPath() + name_)
|
||||||
, pool(storage.getContext()->getSchedulePool())
|
, pool(storage.getContext()->getSchedulePool())
|
||||||
|
, log(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"))
|
||||||
{
|
{
|
||||||
init();
|
init();
|
||||||
sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this]
|
sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this]
|
||||||
@ -172,8 +174,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
|||||||
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME
|
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME
|
||||||
|| e.code() == ErrorCodes::CORRUPTED_DATA)
|
|| e.code() == ErrorCodes::CORRUPTED_DATA)
|
||||||
{
|
{
|
||||||
LOG_WARNING(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"),
|
LOG_WARNING(log, "WAL file '{}' is broken. {}", path, e.displayText());
|
||||||
"WAL file '{}' is broken. {}", path, e.displayText());
|
|
||||||
|
|
||||||
/// If file is broken, do not write new parts to it.
|
/// If file is broken, do not write new parts to it.
|
||||||
/// But if it contains any part rotate and save them.
|
/// But if it contains any part rotate and save them.
|
||||||
@ -203,6 +204,15 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
|
|||||||
|
|
||||||
part_out.writePrefix();
|
part_out.writePrefix();
|
||||||
part_out.write(block);
|
part_out.write(block);
|
||||||
|
|
||||||
|
for (const auto & projection : metadata_snapshot->getProjections())
|
||||||
|
{
|
||||||
|
auto projection_block = projection.calculate(block, context);
|
||||||
|
if (projection_block.rows())
|
||||||
|
part->addProjectionPart(
|
||||||
|
projection.name,
|
||||||
|
MergeTreeDataWriter::writeInMemoryProjectionPart(storage, log, projection_block, projection, part.get()));
|
||||||
|
}
|
||||||
part_out.writeSuffixAndFinalizePart(part);
|
part_out.writeSuffixAndFinalizePart(part);
|
||||||
|
|
||||||
min_block_number = std::min(min_block_number, part->info.min_block);
|
min_block_number = std::min(min_block_number, part->info.min_block);
|
||||||
|
@ -91,6 +91,8 @@ private:
|
|||||||
bool sync_scheduled = false;
|
bool sync_scheduled = false;
|
||||||
|
|
||||||
mutable std::mutex write_mutex;
|
mutable std::mutex write_mutex;
|
||||||
|
|
||||||
|
Poco::Logger * log;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@ public:
|
|||||||
void writeSuffix() override;
|
void writeSuffix() override;
|
||||||
|
|
||||||
/// Finilize writing part and fill inner structures
|
/// Finilize writing part and fill inner structures
|
||||||
|
/// If part is new and contains projections, they should be added before invoking this method.
|
||||||
void writeSuffixAndFinalizePart(
|
void writeSuffixAndFinalizePart(
|
||||||
MergeTreeData::MutableDataPartPtr & new_part,
|
MergeTreeData::MutableDataPartPtr & new_part,
|
||||||
bool sync = false,
|
bool sync = false,
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include <Processors/Pipe.h>
|
#include <Processors/Pipe.h>
|
||||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||||
|
|
||||||
|
#include <DataStreams/SquashingBlockInputStream.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -23,6 +24,7 @@ namespace ErrorCodes
|
|||||||
extern const int NO_SUCH_PROJECTION_IN_TABLE;
|
extern const int NO_SUCH_PROJECTION_IN_TABLE;
|
||||||
extern const int ILLEGAL_PROJECTION;
|
extern const int ILLEGAL_PROJECTION;
|
||||||
extern const int NOT_IMPLEMENTED;
|
extern const int NOT_IMPLEMENTED;
|
||||||
|
extern const int LOGICAL_ERROR;
|
||||||
};
|
};
|
||||||
|
|
||||||
const char * ProjectionDescription::typeToString(Type type)
|
const char * ProjectionDescription::typeToString(Type type)
|
||||||
@ -192,6 +194,28 @@ void ProjectionDescription::recalculateWithNewColumns(const ColumnsDescription &
|
|||||||
*this = getProjectionFromAST(definition_ast, new_columns, query_context);
|
*this = getProjectionFromAST(definition_ast, new_columns, query_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Block ProjectionDescription::calculate(const Block & block, ContextPtr context) const
|
||||||
|
{
|
||||||
|
auto in = InterpreterSelectQuery(
|
||||||
|
query_ast,
|
||||||
|
context,
|
||||||
|
Pipe(std::make_shared<SourceFromSingleChunk>(block, Chunk(block.getColumns(), block.rows()))),
|
||||||
|
SelectQueryOptions{
|
||||||
|
type == ProjectionDescription::Type::Normal ? QueryProcessingStage::FetchColumns
|
||||||
|
: QueryProcessingStage::WithMergeableState})
|
||||||
|
.execute()
|
||||||
|
.getInputStream();
|
||||||
|
in = std::make_shared<SquashingBlockInputStream>(in, block.rows(), 0);
|
||||||
|
in->readPrefix();
|
||||||
|
auto ret = in->read();
|
||||||
|
if (in->read())
|
||||||
|
throw Exception("Projection cannot increase the number of rows in a block", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
in->readSuffix();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
String ProjectionsDescription::toString() const
|
String ProjectionsDescription::toString() const
|
||||||
{
|
{
|
||||||
if (empty())
|
if (empty())
|
||||||
|
@ -85,6 +85,8 @@ struct ProjectionDescription
|
|||||||
void recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context);
|
void recalculateWithNewColumns(const ColumnsDescription & new_columns, ContextPtr query_context);
|
||||||
|
|
||||||
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
|
bool isPrimaryKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node) const;
|
||||||
|
|
||||||
|
Block calculate(const Block & block, ContextPtr context) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// All projections in storage
|
/// All projections in storage
|
||||||
|
@ -163,6 +163,7 @@ struct SelectQueryInfo
|
|||||||
std::optional<ProjectionCandidate> projection;
|
std::optional<ProjectionCandidate> projection;
|
||||||
bool ignore_projections = false;
|
bool ignore_projections = false;
|
||||||
bool is_projection_query = false;
|
bool is_projection_query = false;
|
||||||
|
bool merge_tree_empty_result = false;
|
||||||
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
|
MergeTreeDataSelectAnalysisResultPtr merge_tree_select_result_ptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -7540,6 +7540,9 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
|
|||||||
|
|
||||||
out.writePrefix();
|
out.writePrefix();
|
||||||
out.write(block);
|
out.write(block);
|
||||||
|
/// TODO(ab): What projections should we add to the empty part? How can we make sure that it
|
||||||
|
/// won't block future merges? Perhaps we should also check part emptiness when selecting parts
|
||||||
|
/// to merge.
|
||||||
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert);
|
||||||
|
|
||||||
try
|
try
|
||||||
|
@ -75,7 +75,9 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
|
|||||||
|
|
||||||
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||||
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
|
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
|
||||||
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}
|
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
|
||||||
|
|
||||||
|
{"projections", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
@ -253,6 +255,13 @@ void StorageSystemParts::processNextStorage(
|
|||||||
add_ttl_info_map(part->ttl_infos.group_by_ttl);
|
add_ttl_info_map(part->ttl_infos.group_by_ttl);
|
||||||
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
|
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
|
||||||
|
|
||||||
|
Array projections;
|
||||||
|
for (const auto & [name, _] : part->getProjectionParts())
|
||||||
|
projections.push_back(name);
|
||||||
|
|
||||||
|
if (columns_mask[src_index++])
|
||||||
|
columns[res_index++]->insert(projections);
|
||||||
|
|
||||||
/// _state column should be the latest.
|
/// _state column should be the latest.
|
||||||
/// Do not use part->getState*, it can be changed from different thread
|
/// Do not use part->getState*, it can be changed from different thread
|
||||||
if (has_state_column)
|
if (has_state_column)
|
||||||
|
Loading…
Reference in New Issue
Block a user