Merge branch 'ClickHouse:master' into time_buckets_impl

This commit is contained in:
Yarik Briukhovetskyi 2024-09-05 19:57:53 +02:00 committed by GitHub
commit e799adc618
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
63 changed files with 927 additions and 685 deletions

View File

@ -111,15 +111,16 @@ ANN indexes are built during column insertion and merge. As a result, `INSERT` a
tables. ANNIndexes are ideally used only with immutable or rarely changed data, respectively when are far more read requests than write
requests.
ANN indexes support these queries:
ANN indexes support this type of query:
``` sql
SELECT *
FROM table
[WHERE ...]
ORDER BY Distance(vectors, Point)
LIMIT N
```
``` sql
WITH [...] AS reference_vector
SELECT *
FROM table
WHERE ... -- WHERE clause is optional
ORDER BY Distance(vectors, reference_vector)
LIMIT N
```
:::tip
To avoid writing out large vectors, you can use [query

View File

@ -265,8 +265,6 @@ SELECT now() AS current_date_time, current_date_time + INTERVAL '4' day + INTERV
└─────────────────────┴────────────────────────────────────────────────────────────┘
```
You can work with dates without using `INTERVAL`, just by adding or subtracting seconds, minutes, and hours. For example, an interval of one day can be set by adding `60*60*24`.
:::note
The `INTERVAL` syntax or `addDays` function are always preferred. Simple addition or subtraction (syntax like `now() + ...`) doesn't consider time settings. For example, daylight saving time.
:::

View File

@ -351,7 +351,7 @@ ALTER TABLE mt DELETE IN PARTITION ID '2' WHERE p = 2;
You can specify the partition expression in `ALTER ... PARTITION` queries in different ways:
- As a value from the `partition` column of the `system.parts` table. For example, `ALTER TABLE visits DETACH PARTITION 201901`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- Using the keyword `ALL`. It can be used only with DROP/DETACH/ATTACH/ATTACH FROM. For example, `ALTER TABLE visits ATTACH PARTITION ALL`.
- As a tuple of expressions or constants that matches (in types) the table partitioning keys tuple. In the case of a single element partitioning key, the expression should be wrapped in the `tuple (...)` function. For example, `ALTER TABLE visits DETACH PARTITION tuple(toYYYYMM(toDate('2019-01-25')))`.
- Using the partition ID. Partition ID is a string identifier of the partition (human-readable, if possible) that is used as the names of partitions in the file system and in ZooKeeper. The partition ID must be specified in the `PARTITION ID` clause, in a single quotes. For example, `ALTER TABLE visits DETACH PARTITION ID '201901'`.
- In the [ALTER ATTACH PART](#attach-partitionpart) and [DROP DETACHED PART](#drop-detached-partitionpart) query, to specify the name of a part, use string literal with a value from the `name` column of the [system.detached_parts](/docs/en/operations/system-tables/detached_parts.md/#system_tables-detached_parts) table. For example, `ALTER TABLE visits ATTACH PART '201901_1_1_0'`.

View File

@ -13,7 +13,7 @@ The lightweight `DELETE` statement removes rows from the table `[db.]table` that
DELETE FROM [db.]table [ON CLUSTER cluster] [IN PARTITION partition_expr] WHERE expr;
```
It is called "lightweight `DELETE`" to contrast it to the [ALTER table DELETE](/en/sql-reference/statements/alter/delete) command, which is a heavyweight process.
It is called "lightweight `DELETE`" to contrast it to the [ALTER TABLE ... DELETE](/en/sql-reference/statements/alter/delete) command, which is a heavyweight process.
## Examples
@ -22,23 +22,25 @@ It is called "lightweight `DELETE`" to contrast it to the [ALTER table DELETE](/
DELETE FROM hits WHERE Title LIKE '%hello%';
```
## Lightweight `DELETE` does not delete data from storage immediately
## Lightweight `DELETE` does not delete data immediately
With lightweight `DELETE`, deleted rows are internally marked as deleted immediately and will be automatically filtered out of all subsequent queries. However, cleanup of data happens during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
Lightweight `DELETE` is implemented as a [mutation](/en/sql-reference/statements/alter#mutations), which is executed asynchronously in the background by default. The statement is going to return almost immediately, but the data can still be visible to queries until the mutation is finished.
If you need to guarantee that your data is deleted from storage in a predictable time, consider using the [ALTER table DELETE](/en/sql-reference/statements/alter/delete) command. Note that deleting data using `ALTER table DELETE` may consume significant resources as it recreates all affected parts.
The mutation marks rows as deleted, and at that point, they will no longer show up in query results. It does not physically delete the data, this will happen during the next merge. As a result, it is possible that for an unspecified period, data is not actually deleted from storage and is only marked as deleted.
If you need to guarantee that your data is deleted from storage in a predictable time, consider using the table setting [`min_age_to_force_merge_seconds`](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings#min_age_to_force_merge_seconds). Or you can use the [ALTER TABLE ... DELETE](/en/sql-reference/statements/alter/delete) command. Note that deleting data using `ALTER TABLE ... DELETE` may consume significant resources as it recreates all affected parts.
## Deleting large amounts of data
Large deletes can negatively affect ClickHouse performance. If you are attempting to delete all rows from a table, consider using the [`TRUNCATE TABLE`](/en/sql-reference/statements/truncate) command.
If you anticipate frequent deletes, consider using a [custom partitioning key](/en/engines/table-engines/mergetree-family/custom-partitioning-key). You can then use the [`ALTER TABLE...DROP PARTITION`](/en/sql-reference/statements/alter/partition#drop-partitionpart) command to quickly drop all rows associated with that partition.
If you anticipate frequent deletes, consider using a [custom partitioning key](/en/engines/table-engines/mergetree-family/custom-partitioning-key). You can then use the [`ALTER TABLE ... DROP PARTITION`](/en/sql-reference/statements/alter/partition#drop-partitionpart) command to quickly drop all rows associated with that partition.
## Limitations of lightweight `DELETE`
### Lightweight `DELETE`s with projections
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation. But there is a [MergeTree setting](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings) `lightweight_mutation_projection_mode` can change the behavior.
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation. But there is a [MergeTree setting](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings) `lightweight_mutation_projection_mode` to change the behavior.
## Performance considerations when using lightweight `DELETE`
@ -48,7 +50,7 @@ The following can also negatively impact lightweight `DELETE` performance:
- A heavy `WHERE` condition in a `DELETE` query.
- If the mutations queue is filled with many other mutations, this can possibly lead to performance issues as all mutations on a table are executed sequentially.
- The affected table having a very large number of data parts.
- The affected table has a very large number of data parts.
- Having a lot of data in compact parts. In a Compact part, all columns are stored in one file.
## Delete permissions
@ -61,31 +63,31 @@ GRANT ALTER DELETE ON db.table to username;
## How lightweight DELETEs work internally in ClickHouse
1. A "mask" is applied to affected rows
1. **A "mask" is applied to affected rows**
When a `DELETE FROM table ...` query is executed, ClickHouse saves a mask where each row is marked as either “existing” or as “deleted”. Those “deleted” rows are omitted for subsequent queries. However, rows are actually only removed later by subsequent merges. Writing this mask is much more lightweight than what is done by an `ALTER table DELETE` query.
When a `DELETE FROM table ...` query is executed, ClickHouse saves a mask where each row is marked as either “existing” or as “deleted”. Those “deleted” rows are omitted for subsequent queries. However, rows are actually only removed later by subsequent merges. Writing this mask is much more lightweight than what is done by an `ALTER TABLE ... DELETE` query.
The mask is implemented as a hidden `_row_exists` system column that stores `True` for all visible rows and `False` for deleted ones. This column is only present in a part if some rows in the part were deleted. This column does not exist when a part has all values equal to `True`.
The mask is implemented as a hidden `_row_exists` system column that stores `True` for all visible rows and `False` for deleted ones. This column is only present in a part if some rows in the part were deleted. This column does not exist when a part has all values equal to `True`.
2. `SELECT` queries are transformed to include the mask
2. **`SELECT` queries are transformed to include the mask**
When a masked column is used in a query, the `SELECT ... FROM table WHERE condition` query internally is extended by the predicate on `_row_exists` and is transformed to:
```sql
SELECT ... FROM table PREWHERE _row_exists WHERE condition
```
At execution time, the column `_row_exists` is read to determine which rows should not be returned. If there are many deleted rows, ClickHouse can determine which granules can be fully skipped when reading the rest of the columns.
When a masked column is used in a query, the `SELECT ... FROM table WHERE condition` query internally is extended by the predicate on `_row_exists` and is transformed to:
```sql
SELECT ... FROM table PREWHERE _row_exists WHERE condition
```
At execution time, the column `_row_exists` is read to determine which rows should not be returned. If there are many deleted rows, ClickHouse can determine which granules can be fully skipped when reading the rest of the columns.
3. `DELETE` queries are transformed to `ALTER table UPDATE` queries
3. **`DELETE` queries are transformed to `ALTER TABLE ... UPDATE` queries**
The `DELETE FROM table WHERE condition` is translated into an `ALTER table UPDATE _row_exists = 0 WHERE condition` mutation.
The `DELETE FROM table WHERE condition` is translated into an `ALTER TABLE table UPDATE _row_exists = 0 WHERE condition` mutation.
Internally, this mutation is executed in two steps:
Internally, this mutation is executed in two steps:
1. A `SELECT count() FROM table WHERE condition` command is executed for each individual part to determine if the part is affected.
1. A `SELECT count() FROM table WHERE condition` command is executed for each individual part to determine if the part is affected.
2. Based on the commands above, affected parts are then mutated, and hardlinks are created for unaffected parts. In the case of wide parts, the `_row_exists` column for each row is updated and all other columns' files are hardlinked. For compact parts, all columns are re-written because they are all stored together in one file.
2. Based on the commands above, affected parts are then mutated, and hardlinks are created for unaffected parts. In the case of wide parts, the `_row_exists` column for each row is updated, and all other columns' files are hardlinked. For compact parts, all columns are re-written because they are all stored together in one file.
From the steps above, we can see that lightweight deletes using the masking technique improves performance over traditional `ALTER table DELETE` commands because `ALTER table DELETE` reads and re-writes all the columns' files for affected parts.
From the steps above, we can see that lightweight `DELETE` using the masking technique improves performance over traditional `ALTER TABLE ... DELETE` because it does not re-write all the columns' files for affected parts.
## Related content

View File

@ -147,6 +147,7 @@ ColumnDependencies getAllColumnDependencies(
bool isStorageTouchedByMutations(
MergeTreeData::DataPartPtr source_part,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextPtr context)
@ -154,7 +155,7 @@ bool isStorageTouchedByMutations(
if (commands.empty())
return false;
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part);
auto storage_from_part = std::make_shared<StorageFromMergeTreeDataPart>(source_part, mutations_snapshot);
bool all_commands_can_be_skipped = true;
for (const auto & command : commands)
@ -285,8 +286,13 @@ MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(st
{
}
MutationsInterpreter::Source::Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_)
: data(&storage_), part(std::move(source_part_))
MutationsInterpreter::Source::Source(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_)
: data(&storage_)
, part(std::move(source_part_))
, alter_conversions(std::move(alter_conversions_))
{
}
@ -386,13 +392,14 @@ MutationsInterpreter::MutationsInterpreter(
MutationsInterpreter::MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_,
StorageMetadataPtr metadata_snapshot_,
MutationCommands commands_,
Names available_columns_,
ContextPtr context_,
Settings settings_)
: MutationsInterpreter(
Source(storage_, std::move(source_part_)),
Source(storage_, std::move(source_part_), std::move(alter_conversions_)),
std::move(metadata_snapshot_), std::move(commands_),
std::move(available_columns_), std::move(context_), std::move(settings_))
{
@ -1218,7 +1225,7 @@ void MutationsInterpreter::Source::read(
createReadFromPartStep(
MergeTreeSequentialSourceType::Mutation,
plan, *data, storage_snapshot,
part, required_columns,
part, alter_conversions, required_columns,
apply_deleted_mask_, std::move(filter), context_,
getLogger("MutationsInterpreter"));
}

View File

@ -20,6 +20,7 @@ using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
/// Return false if the data isn't going to be changed by mutations.
bool isStorageTouchedByMutations(
MergeTreeData::DataPartPtr source_part,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MutationCommand> & commands,
ContextPtr context
@ -70,6 +71,7 @@ public:
MutationsInterpreter(
MergeTreeData & storage_,
MergeTreeData::DataPartPtr source_part_,
AlterConversionsPtr alter_conversions_,
StorageMetadataPtr metadata_snapshot_,
MutationCommands commands_,
Names available_columns_,
@ -137,7 +139,7 @@ public:
bool can_execute_) const;
explicit Source(StoragePtr storage_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_);
Source(MergeTreeData & storage_, MergeTreeData::DataPartPtr source_part_, AlterConversionsPtr alter_conversions_);
private:
StoragePtr storage;
@ -145,6 +147,7 @@ public:
/// Special case for *MergeTree.
MergeTreeData * data = nullptr;
MergeTreeData::DataPartPtr part;
AlterConversionsPtr alter_conversions;
};
private:

View File

@ -757,7 +757,7 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
projection_reading = reader.readFromParts(
/* parts = */ {},
/* alter_conversions = */ {},
reading->getMutationsSnapshot()->cloneEmpty(),
best_candidate->dag.getRequiredColumnsNames(),
proj_snapshot,
projection_query_info,

View File

@ -199,7 +199,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
auto projection_reading = reader.readFromParts(
/*parts=*/ {},
/*alter_conversions=*/ {},
reading->getMutationsSnapshot()->cloneEmpty(),
required_columns,
proj_snapshot,
query_info_copy,

View File

@ -41,12 +41,19 @@ bool canUseProjectionForReadingStep(ReadFromMergeTree * reading)
if (reading->readsInOrder())
return false;
const auto & query_settings = reading->getContext()->getSettingsRef();
// Currently projection don't support deduplication when moving parts between shards.
if (reading->getContext()->getSettingsRef().allow_experimental_query_deduplication)
if (query_settings.allow_experimental_query_deduplication)
return false;
// Currently projection don't support settings which implicitly modify aggregate functions.
if (reading->getContext()->getSettingsRef().aggregate_functions_null_for_empty)
if (query_settings.aggregate_functions_null_for_empty)
return false;
/// Don't use projections if have mutations to apply
/// because we need to apply them on original data.
if (query_settings.apply_mutations_on_fly && reading->getMutationsSnapshot()->hasDataMutations())
return false;
return true;
@ -215,20 +222,15 @@ bool analyzeProjectionCandidate(
{
MergeTreeData::DataPartsVector projection_parts;
MergeTreeData::DataPartsVector normal_parts;
std::vector<AlterConversionsPtr> alter_conversions;
for (const auto & part_with_ranges : parts_with_ranges)
{
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end() && !it->second->is_broken)
{
projection_parts.push_back(it->second);
}
else
{
normal_parts.push_back(part_with_ranges.data_part);
alter_conversions.push_back(part_with_ranges.alter_conversions);
}
}
if (projection_parts.empty())
@ -241,6 +243,7 @@ bool analyzeProjectionCandidate(
auto projection_result_ptr = reader.estimateNumMarksToRead(
std::move(projection_parts),
reading.getMutationsSnapshot()->cloneEmpty(),
required_column_names,
candidate.projection->metadata,
projection_query_info,
@ -254,7 +257,7 @@ bool analyzeProjectionCandidate(
if (!normal_parts.empty())
{
/// TODO: We can reuse existing analysis_result by filtering out projection parts
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts), std::move(alter_conversions));
auto normal_result_ptr = reading.selectRangesToRead(std::move(normal_parts));
if (normal_result_ptr->selected_marks != 0)
{

View File

@ -229,7 +229,6 @@ public:
{
ranges_in_data_parts.emplace_back(
initial_ranges_in_data_parts[part_index].data_part,
initial_ranges_in_data_parts[part_index].alter_conversions,
initial_ranges_in_data_parts[part_index].part_index_in_query,
MarkRanges{mark_range});
part_index_to_initial_ranges_in_data_parts_index[it->second] = part_index;

View File

@ -266,7 +266,7 @@ void ReadFromMergeTree::AnalysisResult::checkLimits(const Settings & settings, c
ReadFromMergeTree::ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
std::vector<AlterConversionsPtr> alter_conversions_,
MergeTreeData::MutationsSnapshotPtr mutations_,
Names all_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
@ -283,7 +283,7 @@ ReadFromMergeTree::ReadFromMergeTree(
query_info_.prewhere_info)}, all_column_names_, query_info_, storage_snapshot_, context_)
, reader_settings(getMergeTreeReaderSettings(context_, query_info_))
, prepared_parts(std::move(parts_))
, alter_conversions_for_parts(std::move(alter_conversions_))
, mutations_snapshot(std::move(mutations_))
, all_column_names(std::move(all_column_names_))
, data(data_)
, actions_settings(ExpressionActionsSettings::fromContext(context_))
@ -372,6 +372,7 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
std::move(extension),
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -453,6 +454,7 @@ Pipe ReadFromMergeTree::readFromPool(
{
pool = std::make_shared<MergeTreePrefetchedReadPool>(
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -466,6 +468,7 @@ Pipe ReadFromMergeTree::readFromPool(
{
pool = std::make_shared<MergeTreeReadPool>(
std::move(parts_with_range),
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -554,6 +557,7 @@ Pipe ReadFromMergeTree::readInOrder(
std::move(extension),
mode,
parts_with_ranges,
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -569,6 +573,7 @@ Pipe ReadFromMergeTree::readInOrder(
has_limit_below_one_block,
read_type,
parts_with_ranges,
mutations_snapshot,
shared_virtual_fields,
storage_snapshot,
prewhere_info,
@ -1038,7 +1043,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
ranges_to_get_from_part = split_ranges(ranges_to_get_from_part, input_order_info->direction);
new_parts.emplace_back(part.data_part, part.alter_conversions, part.part_index_in_query, std::move(ranges_to_get_from_part));
new_parts.emplace_back(part.data_part, part.part_index_in_query, std::move(ranges_to_get_from_part));
}
splitted_parts_and_ranges.emplace_back(std::move(new_parts));
@ -1265,7 +1270,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts new_parts;
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
new_parts.emplace_back(part_it->data_part, part_it->alter_conversions, part_it->part_index_in_query, part_it->ranges);
new_parts.emplace_back(part_it->data_part, part_it->part_index_in_query, part_it->ranges);
if (new_parts.empty())
continue;
@ -1378,15 +1383,14 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool find_exact_ranges) const
{
return selectRangesToRead(prepared_parts, alter_conversions_for_parts, find_exact_ranges);
return selectRangesToRead(prepared_parts, find_exact_ranges);
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts, std::vector<AlterConversionsPtr> alter_conversions, bool find_exact_ranges) const
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges) const
{
return selectRangesToRead(
std::move(parts),
std::move(alter_conversions),
mutations_snapshot,
storage_snapshot->metadata,
query_info,
context,
@ -1404,9 +1408,11 @@ static void buildIndexes(
const ActionsDAG * filter_actions_dag,
const MergeTreeData & data,
const MergeTreeData::DataPartsVector & parts,
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot,
const ContextPtr & context,
const SelectQueryInfo & query_info,
const StorageMetadataPtr & metadata_snapshot)
const StorageMetadataPtr & metadata_snapshot,
const LoggerPtr & log)
{
indexes.reset();
@ -1432,19 +1438,21 @@ static void buildIndexes(
indexes->partition_pruner.emplace(metadata_snapshot, filter_actions_dag, context, false /* strict */);
}
indexes->part_values
= MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
indexes->part_values = MergeTreeDataSelectExecutor::filterPartsByVirtualColumns(metadata_snapshot, data, parts, filter_actions_dag, context);
MergeTreeDataSelectExecutor::buildKeyConditionFromPartOffset(indexes->part_offset_condition, filter_actions_dag, context);
indexes->use_skip_indexes = settings.use_skip_indexes;
bool final = query_info.isFinal();
if (final && !settings.use_skip_indexes_if_final)
if (query_info.isFinal() && !settings.use_skip_indexes_if_final)
indexes->use_skip_indexes = false;
if (!indexes->use_skip_indexes)
return;
const auto & all_indexes = metadata_snapshot->getSecondaryIndices();
if (all_indexes.empty())
return;
std::unordered_set<std::string> ignored_index_names;
if (settings.ignore_data_skipping_indices.changed)
@ -1469,47 +1477,68 @@ static void buildIndexes(
throw Exception(ErrorCodes::CANNOT_PARSE_TEXT, "Cannot parse ignore_data_skipping_indices ('{}')", indices);
}
auto all_updated_columns = mutations_snapshot->getAllUpdatedColumns();
UsefulSkipIndexes skip_indexes;
using Key = std::pair<String, size_t>;
std::map<Key, size_t> merged;
for (const auto & index : metadata_snapshot->getSecondaryIndices())
for (const auto & index : all_indexes)
{
if (!ignored_index_names.contains(index.name))
if (ignored_index_names.contains(index.name))
continue;
auto index_helper = MergeTreeIndexFactory::instance().get(index);
if (!all_updated_columns.empty())
{
auto index_helper = MergeTreeIndexFactory::instance().get(index);
if (index_helper->isMergeable())
auto required_columns = index_helper->getColumnsRequiredForIndexCalc();
auto it = std::ranges::find_if(required_columns, [&](const auto & column_name)
{
auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
if (inserted)
{
skip_indexes.merged_indices.emplace_back();
skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
}
return all_updated_columns.contains(column_name);
});
skip_indexes.merged_indices[it->second].addIndex(index_helper);
}
else
if (it != required_columns.end())
{
MergeTreeIndexConditionPtr condition;
if (index_helper->isVectorSimilarityIndex())
{
#if USE_USEARCH
if (const auto * vector_similarity_index = typeid_cast<const MergeTreeIndexVectorSimilarity *>(index_helper.get()))
condition = vector_similarity_index->createIndexCondition(query_info, context);
#endif
if (const auto * legacy_vector_similarity_index = typeid_cast<const MergeTreeIndexLegacyVectorSimilarity *>(index_helper.get()))
condition = legacy_vector_similarity_index->createIndexCondition(query_info, context);
if (!condition)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
}
else
condition = index_helper->createIndexCondition(filter_actions_dag, context);
if (!condition->alwaysUnknownOrTrue())
skip_indexes.useful_indices.emplace_back(index_helper, condition);
LOG_TRACE(log, "Index {} is not used because it depends on column {} which will be updated on fly", index.name, *it);
continue;
}
}
if (index_helper->isMergeable())
{
auto [it, inserted] = merged.emplace(Key{index_helper->index.type, index_helper->getGranularity()}, skip_indexes.merged_indices.size());
if (inserted)
{
skip_indexes.merged_indices.emplace_back();
skip_indexes.merged_indices.back().condition = index_helper->createIndexMergedCondition(query_info, metadata_snapshot);
}
skip_indexes.merged_indices[it->second].addIndex(index_helper);
continue;
}
MergeTreeIndexConditionPtr condition;
if (index_helper->isVectorSimilarityIndex())
{
#if USE_USEARCH
if (const auto * vector_similarity_index = typeid_cast<const MergeTreeIndexVectorSimilarity *>(index_helper.get()))
condition = vector_similarity_index->createIndexCondition(query_info, context);
#endif
if (const auto * legacy_vector_similarity_index = typeid_cast<const MergeTreeIndexLegacyVectorSimilarity *>(index_helper.get()))
condition = legacy_vector_similarity_index->createIndexCondition(query_info, context);
if (!condition)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown vector search index {}", index_helper->index.name);
}
else
{
condition = index_helper->createIndexCondition(filter_actions_dag, context);
}
if (!condition->alwaysUnknownOrTrue())
skip_indexes.useful_indices.emplace_back(index_helper, condition);
}
// move minmax indices to first positions, so they will be applied first as cheapest ones
@ -1547,15 +1576,17 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
query_info.filter_actions_dag.get(),
data,
prepared_parts,
mutations_snapshot,
context,
query_info,
storage_snapshot->metadata);
storage_snapshot->metadata,
log);
}
}
ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info_,
ContextPtr context_,
@ -1586,7 +1617,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
const Names & primary_key_column_names = primary_key.column_names;
if (!indexes)
buildIndexes(indexes, query_info_.filter_actions_dag.get(), data, parts, context_, query_info_, metadata_snapshot);
buildIndexes(indexes, query_info_.filter_actions_dag.get(), data, parts, mutations_snapshot, context_, query_info_, metadata_snapshot, log);
if (indexes->part_values && indexes->part_values->empty())
return std::make_shared<AnalysisResult>(std::move(result));
@ -1617,10 +1648,9 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
{
MergeTreeDataSelectExecutor::filterPartsByPartition(
parts,
indexes->partition_pruner,
indexes->minmax_idx_condition,
parts,
alter_conversions,
indexes->part_values,
metadata_snapshot,
data,
@ -1649,7 +1679,6 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
auto reader_settings = getMergeTreeReaderSettings(context_, query_info_);
result.parts_with_ranges = MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
std::move(parts),
std::move(alter_conversions),
metadata_snapshot,
context_,
indexes->key_condition,

View File

@ -110,7 +110,7 @@ public:
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
std::vector<AlterConversionsPtr> alter_conversions_,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot_,
Names all_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
@ -154,7 +154,7 @@ public:
static AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
@ -166,8 +166,7 @@ public:
std::optional<Indexes> & indexes,
bool find_exact_ranges);
AnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts, std::vector<AlterConversionsPtr> alter_conversions, bool find_exact_ranges = false) const;
AnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts, bool find_exact_ranges = false) const;
AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const;
@ -188,7 +187,7 @@ public:
void setAnalyzedResult(AnalysisResultPtr analyzed_result_ptr_) { analyzed_result_ptr = std::move(analyzed_result_ptr_); }
const MergeTreeData::DataPartsVector & getParts() const { return prepared_parts; }
const std::vector<AlterConversionsPtr> & getAlterConvertionsForParts() const { return alter_conversions_for_parts; }
MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot() const { return mutations_snapshot; }
const MergeTreeData & getMergeTreeData() const { return data; }
size_t getMaxBlockSize() const { return block_size.max_block_size_rows; }
@ -209,7 +208,7 @@ private:
MergeTreeReaderSettings reader_settings;
MergeTreeData::DataPartsVector prepared_parts;
std::vector<AlterConversionsPtr> alter_conversions_for_parts;
MergeTreeData::MutationsSnapshotPtr mutations_snapshot;
Names all_column_names;

View File

@ -271,6 +271,7 @@ public:
/// Return true if the trivial count query could be optimized without reading the data at all
/// in totalRows() or totalRowsByPartitionPredicate() methods or with optimized reading in read() method.
/// 'storage_snapshot' may be nullptr.
virtual bool supportsTrivialCountOptimization(const StorageSnapshotPtr & /*storage_snapshot*/, ContextPtr /*query_context*/) const
{
return false;

View File

@ -9,9 +9,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool AlterConversions::supportsMutationCommandType(MutationCommand::Type t)
bool AlterConversions::isSupportedDataMutation(MutationCommand::Type)
{
return t == MutationCommand::Type::RENAME_COLUMN;
/// Currently there is no such mutations. See setting 'apply_mutations_on_fly'.
return false;
}
bool AlterConversions::isSupportedMetadataMutation(MutationCommand::Type type)
{
return type == MutationCommand::Type::RENAME_COLUMN;
}
void AlterConversions::addMutationCommand(const MutationCommand & command)

View File

@ -1,8 +1,8 @@
#pragma once
#include <Storages/MutationCommands.h>
#include <string>
#include <unordered_map>
#include <Interpreters/Context_fwd.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
@ -11,11 +11,17 @@ namespace DB
/// Alter conversions which should be applied on-fly for part.
/// Built from of the most recent mutation commands for part.
/// Now only ALTER RENAME COLUMN is applied.
class AlterConversions : private boost::noncopyable
class AlterConversions : private WithContext, boost::noncopyable
{
public:
AlterConversions() = default;
AlterConversions(StorageMetadataPtr metadata_snapshot_, ContextPtr context_)
: WithContext(context_)
, metadata_snapshot(std::move(metadata_snapshot_))
{
}
struct RenamePair
{
std::string rename_to;
@ -34,11 +40,13 @@ public:
/// Get column old name before rename (lookup by key in rename_map)
std::string getColumnOldName(const std::string & new_name) const;
static bool supportsMutationCommandType(MutationCommand::Type);
static bool isSupportedDataMutation(MutationCommand::Type type);
static bool isSupportedMetadataMutation(MutationCommand::Type type);
private:
/// Rename map new_name -> old_name.
std::vector<RenamePair> rename_map;
StorageMetadataPtr metadata_snapshot;
};
using AlterConversionsPtr = std::shared_ptr<const AlterConversions>;

View File

@ -95,22 +95,18 @@ UInt32 DataPartStorageOnDiskFull::getRefCount(const String & file_name) const
return volume->getDisk()->getRefCount(fs::path(root_path) / part_dir / file_name);
}
std::string DataPartStorageOnDiskFull::getRemotePath(const std::string & file_name, bool if_exists) const
std::vector<std::string> DataPartStorageOnDiskFull::getRemotePaths(const std::string & file_name) const
{
const std::string path = fs::path(root_path) / part_dir / file_name;
auto objects = volume->getDisk()->getStorageObjects(path);
if (objects.empty() && if_exists)
return "";
std::vector<std::string> remote_paths;
remote_paths.reserve(objects.size());
if (objects.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"One file must be mapped to one object on blob storage by path {} in MergeTree tables, have {}.",
path, objects.size());
}
for (const auto & object : objects)
remote_paths.push_back(object.remote_path);
return objects[0].remote_path;
return remote_paths;
}
String DataPartStorageOnDiskFull::getUniqueId() const

View File

@ -23,7 +23,7 @@ public:
Poco::Timestamp getFileLastModified(const String & file_name) const override;
size_t getFileSize(const std::string & file_name) const override;
UInt32 getRefCount(const std::string & file_name) const override;
std::string getRemotePath(const std::string & file_name, bool if_exists) const override;
std::vector<std::string> getRemotePaths(const std::string & file_name) const override;
String getUniqueId() const override;
std::unique_ptr<ReadBufferFromFileBase> readFile(

View File

@ -126,7 +126,7 @@ public:
virtual UInt32 getRefCount(const std::string & file_name) const = 0;
/// Get path on remote filesystem from file name on local filesystem.
virtual std::string getRemotePath(const std::string & file_name, bool if_exists) const = 0;
virtual std::vector<std::string> getRemotePaths(const std::string & file_name) const = 0;
virtual UInt64 calculateTotalSizeOnDisk() const = 0;

View File

@ -286,6 +286,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (enabledBlockOffsetColumn(global_ctx))
addGatheringColumn(global_ctx, BlockOffsetColumn::name, BlockOffsetColumn::type);
MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = global_ctx->metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = MergeTreeData::getMinMetadataVersion(global_ctx->future_part->parts),
};
auto mutations_snapshot = global_ctx->data->getMutationsSnapshot(params);
SerializationInfo::Settings info_settings =
{
.ratio_of_defaults_for_sparse = global_ctx->data->getSettings()->ratio_of_defaults_for_sparse_serialization,
@ -293,10 +301,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
};
SerializationInfoByName infos(global_ctx->storage_columns, info_settings);
global_ctx->alter_conversions.reserve(global_ctx->future_part->parts.size());
for (const auto & part : global_ctx->future_part->parts)
{
global_ctx->new_data_part->ttl_infos.update(part->ttl_infos);
if (global_ctx->metadata_snapshot->hasAnyTTL() && !part->checkAllTTLCalculated(global_ctx->metadata_snapshot))
{
LOG_INFO(ctx->log, "Some TTL values were not calculated for part {}. Will calculate them forcefully during merge.", part->name);
@ -317,6 +327,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
infos.add(part_infos);
}
global_ctx->alter_conversions.push_back(MergeTreeData::getAlterConversionsForPart(part, mutations_snapshot, global_ctx->metadata_snapshot, global_ctx->context));
}
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
@ -815,6 +827,7 @@ Pipe MergeTask::VerticalMergeStage::createPipeForReadingOneColumn(const String &
*global_ctx->data,
global_ctx->storage_snapshot,
global_ctx->future_part->parts[part_num],
global_ctx->alter_conversions[part_num],
Names{column_name},
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,
@ -1238,13 +1251,14 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
global_ctx->horizontal_stage_progress = std::make_unique<MergeStageProgress>(
ctx->column_sizes ? ctx->column_sizes->keyColumnsWeight() : 1.0);
for (const auto & part : global_ctx->future_part->parts)
for (size_t i = 0; i < global_ctx->future_part->parts.size(); ++i)
{
Pipe pipe = createMergeTreeSequentialSource(
MergeTreeSequentialSourceType::Merge,
*global_ctx->data,
global_ctx->storage_snapshot,
part,
global_ctx->future_part->parts[i],
global_ctx->alter_conversions[i],
global_ctx->merging_columns.getNames(),
/*mark_ranges=*/ {},
global_ctx->input_rows_filtered,

View File

@ -166,6 +166,7 @@ private:
StorageSnapshotPtr storage_snapshot{nullptr};
StorageMetadataPtr metadata_snapshot{nullptr};
FutureMergedMutatedPartPtr future_part{nullptr};
std::vector<AlterConversionsPtr> alter_conversions;
/// This will be either nullptr or new_data_part, so raw pointer is ok.
IMergeTreeDataPart * parent_part{nullptr};
ContextPtr context{nullptr};

View File

@ -5009,7 +5009,7 @@ void MergeTreeData::checkAlterPartitionIsPossible(
const auto * partition_ast = command.partition->as<ASTPartition>();
if (partition_ast && partition_ast->all)
{
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION)
if (command.type != PartitionCommand::DROP_PARTITION && command.type != PartitionCommand::ATTACH_PARTITION && !(command.type == PartitionCommand::REPLACE_PARTITION && !command.replace))
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
}
else
@ -5810,7 +5810,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
const auto & partition_ast = ast->as<ASTPartition &>();
if (partition_ast.all)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DETACH PARTITION ALL currently");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only Support DROP/DETACH/ATTACH PARTITION ALL currently");
if (!partition_ast.value)
{
@ -7156,11 +7156,16 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead(
ContextPtr query_context, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info) const
{
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
const auto & parts = snapshot_data.parts;
MergeTreeDataSelectExecutor reader(*this);
auto result_ptr = reader.estimateNumMarksToRead(
parts, {}, storage_snapshot->metadata, query_info, query_context, query_context->getSettingsRef().max_threads);
snapshot_data.parts,
snapshot_data.mutations_snapshot,
storage_snapshot->metadata->getColumns().getAll().getNames(),
storage_snapshot->metadata,
query_info,
query_context,
query_context->getSettingsRef().max_threads);
UInt64 total_rows = result_ptr->selected_rows;
if (query_info.trivial_limit > 0 && query_info.trivial_limit < total_rows)
@ -8174,11 +8179,15 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
return true;
}
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartPtr part) const
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(
const MergeTreeDataPartPtr & part,
const MutationsSnapshotPtr & mutations,
const StorageMetadataPtr & metadata,
const ContextPtr & query_context)
{
auto commands = getAlterMutationCommandsForPart(part);
auto commands = mutations->getAlterMutationCommandsForPart(part);
auto result = std::make_shared<AlterConversions>(metadata, query_context);
auto result = std::make_shared<AlterConversions>();
for (const auto & command : commands | std::views::reverse)
result->addMutationCommand(command);
@ -8470,9 +8479,28 @@ void MergeTreeData::updateObjectColumns(const DataPartPtr & part, const DataPart
DB::updateObjectColumns(object_columns, columns, part->getColumns());
}
bool MergeTreeData::supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const
bool MergeTreeData::supportsTrivialCountOptimization(const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context) const
{
return !hasLightweightDeletedMask();
if (hasLightweightDeletedMask())
return false;
if (!storage_snapshot)
return !query_context->getSettingsRef().apply_mutations_on_fly;
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
return !snapshot_data.mutations_snapshot->hasDataMutations();
}
Int64 MergeTreeData::getMinMetadataVersion(const DataPartsVector & parts)
{
Int64 version = -1;
for (const auto & part : parts)
{
Int64 part_version = part->getMetadataVersion();
if (version == -1 || part_version < version)
version = part_version;
}
return version;
}
StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const
@ -8486,10 +8514,14 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr &
object_columns_copy = object_columns;
}
snapshot_data->alter_conversions.reserve(snapshot_data->parts.size());
for (const auto & part : snapshot_data->parts)
snapshot_data->alter_conversions.push_back(getAlterConversionsForPart(part));
IMutationsSnapshot::Params params
{
.metadata_version = metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = getMinMetadataVersion(snapshot_data->parts),
.need_data_mutations = query_context->getSettingsRef().apply_mutations_on_fly,
};
snapshot_data->mutations_snapshot = getMutationsSnapshot(params);
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, std::move(object_columns_copy), std::move(snapshot_data));
}
@ -8707,28 +8739,57 @@ void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)
}
}
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove)
static void updateMutationsCounters(
Int64 & num_data_mutations_to_apply,
Int64 & num_metadata_mutations_to_apply,
const MutationCommands & commands,
Int64 increment)
{
if (num_data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", num_data_mutations_to_apply);
if (num_metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", num_metadata_mutations_to_apply);
bool has_data_mutation = false;
bool has_metadata_mutation = false;
for (const auto & command : commands)
{
if (AlterConversions::supportsMutationCommandType(command.type))
if (!has_data_mutation && AlterConversions::isSupportedDataMutation(command.type))
{
if (remove)
{
--alter_conversions_mutations;
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
}
else
{
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
++alter_conversions_mutations;
}
return true;
num_data_mutations_to_apply += increment;
has_data_mutation = true;
if (num_data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", num_data_mutations_to_apply);
}
if (!has_metadata_mutation && AlterConversions::isSupportedMetadataMutation(command.type))
{
num_metadata_mutations_to_apply += increment;
has_metadata_mutation = true;
if (num_metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", num_metadata_mutations_to_apply);
}
}
return false;
}
void incrementMutationsCounters(
Int64 & num_data_mutations_to_apply,
Int64 & num_metadata_mutations_to_apply,
const MutationCommands & commands)
{
updateMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands, 1);
}
void decrementMutationsCounters(
Int64 & num_data_mutations_to_apply,
Int64 & num_metadata_mutations_to_apply,
const MutationCommands & commands)
{
updateMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands, -1);
}
}

View File

@ -443,14 +443,53 @@ public:
bool areAsynchronousInsertsEnabled() const override;
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override;
bool supportsTrivialCountOptimization(const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context) const override;
/// A snapshot of pending mutations that weren't applied to some of the parts yet
/// and should be applied on the fly (i.e. when reading from the part).
/// Mutations not supported by AlterConversions (supportsMutationCommandType()) can be omitted.
struct IMutationsSnapshot
{
/// Contains info that doesn't depend on state of mutations.
struct Params
{
Int64 metadata_version = -1;
Int64 min_part_metadata_version = -1;
bool need_data_mutations = false;
};
/// Contains info that depends on state of mutations.
struct Info
{
Int64 num_data_mutations = 0;
Int64 num_metadata_mutations = 0;
};
Params params;
Info info;
IMutationsSnapshot() = default;
IMutationsSnapshot(Params params_, Info info_): params(std::move(params_)), info(std::move(info_)) {}
/// Returns mutation commands that are required to be applied to the `part`.
/// @return list of mutation commands, in *reverse* order (newest to oldest)
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual std::shared_ptr<IMutationsSnapshot> cloneEmpty() const = 0;
virtual NameSet getAllUpdatedColumns() const = 0;
bool hasDataMutations() const { return params.need_data_mutations && info.num_data_mutations > 0; }
virtual ~IMutationsSnapshot() = default;
};
using MutationsSnapshotPtr = std::shared_ptr<const IMutationsSnapshot>;
/// Snapshot for MergeTree contains the current set of data parts
/// at the moment of the start of query.
/// and mutations required to be applied at the moment of the start of query.
struct SnapshotData : public StorageSnapshot::Data
{
DataPartsVector parts;
std::vector<AlterConversionsPtr> alter_conversions;
MutationsSnapshotPtr mutations_snapshot;
};
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
@ -929,8 +968,18 @@ public:
Disks getDisks() const { return getStoragePolicy()->getDisks(); }
/// Returns a snapshot of mutations that probably will be applied on the fly to parts during reading.
virtual MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const = 0;
/// Returns the minimum version of metadata among parts.
static Int64 getMinMetadataVersion(const DataPartsVector & parts);
/// Return alter conversions for part which must be applied on fly.
AlterConversionsPtr getAlterConversionsForPart(MergeTreeDataPartPtr part) const;
static AlterConversionsPtr getAlterConversionsForPart(
const MergeTreeDataPartPtr & part,
const MutationsSnapshotPtr & mutations,
const StorageMetadataPtr & metadata,
const ContextPtr & query_context);
/// Returns destination disk or volume for the TTL rule according to current storage policy.
SpacePtr getDestinationForMoveTTL(const TTLDescription & move_ttl) const;
@ -1450,13 +1499,6 @@ protected:
/// mechanisms for parts locking
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
/// Return pending mutations that weren't applied to `part` yet and should be applied on the fly
/// (i.e. when reading from the part). Mutations not supported by AlterConversions
/// (supportsMutationCommandType()) can be omitted.
///
/// @return list of mutations, in *reverse* order (newest to oldest)
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
struct PartBackupEntries
{
String part_name;
@ -1731,7 +1773,14 @@ struct CurrentlySubmergingEmergingTagger
};
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
/// Return true if the counter had been updated
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove);
void incrementMutationsCounters(
Int64 & num_data_mutations_to_apply,
Int64 & num_metadata_mutations_to_apply,
const MutationCommands & commands);
void decrementMutationsCounters(
Int64 & num_data_mutations_to_apply,
Int64 & num_metadata_mutations_to_apply,
const MutationCommands & commands);
}

View File

@ -133,12 +133,10 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
bool enable_parallel_reading) const
{
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data);
const auto & parts = snapshot_data.parts;
const auto & alter_conversions = snapshot_data.alter_conversions;
auto step = readFromParts(
parts,
alter_conversions,
snapshot_data.parts,
snapshot_data.mutations_snapshot,
column_names_to_return,
storage_snapshot,
query_info,
@ -500,10 +498,9 @@ std::optional<std::unordered_set<String>> MergeTreeDataSelectExecutor::filterPar
}
void MergeTreeDataSelectExecutor::filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
@ -512,8 +509,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
LoggerPtr log,
ReadFromMergeTree::IndexStats & index_stats)
{
chassert(alter_conversions.empty() || parts.size() == alter_conversions.size());
const Settings & settings = context->getSettingsRef();
DataTypes minmax_columns_types;
@ -537,7 +532,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(
parts,
alter_conversions,
part_values,
data.getPinnedPartUUIDs(),
minmax_idx_condition,
@ -550,7 +544,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
else
selectPartsToRead(
parts,
alter_conversions,
part_values,
minmax_idx_condition,
minmax_columns_types,
@ -589,7 +582,6 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
std::vector<AlterConversionsPtr> && alter_conversions,
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,
@ -602,8 +594,6 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
bool use_skip_indexes,
bool find_exact_ranges)
{
chassert(alter_conversions.empty() || parts.size() == alter_conversions.size());
RangesInDataParts parts_with_ranges;
parts_with_ranges.resize(parts.size());
const Settings & settings = context->getSettingsRef();
@ -662,11 +652,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
auto alter_conversions_for_part = !alter_conversions.empty()
? alter_conversions[part_index]
: std::make_shared<AlterConversions>();
RangesInDataPart ranges(part, alter_conversions_for_part, part_index);
RangesInDataPart ranges(part, part_index);
size_t total_marks_count = part->index_granularity.getMarksCountWithoutFinal();
if (metadata_snapshot->hasPrimaryKey() || part_offset_condition)
@ -904,6 +891,7 @@ std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -916,11 +904,9 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
return std::make_shared<ReadFromMergeTree::AnalysisResult>();
std::optional<ReadFromMergeTree::Indexes> indexes;
/// NOTE: We don't need alter_conversions because the returned analysis_result is only used for:
/// 1. estimate the number of rows to read; 2. projection reading, which doesn't have alter_conversions.
return ReadFromMergeTree::selectRangesToRead(
std::move(parts),
/*alter_conversions=*/{},
mutations_snapshot,
metadata_snapshot,
query_info,
context,
@ -935,7 +921,7 @@ ReadFromMergeTree::AnalysisResultPtr MergeTreeDataSelectExecutor::estimateNumMar
QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names_to_return,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
@ -957,7 +943,7 @@ QueryPlanStepPtr MergeTreeDataSelectExecutor::readFromParts(
return std::make_unique<ReadFromMergeTree>(
std::move(parts),
std::move(alter_conversions),
std::move(mutations_snapshot),
column_names_to_return,
data,
query_info,
@ -1554,7 +1540,6 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
void MergeTreeDataSelectExecutor::selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
@ -1563,14 +1548,11 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
PartFilterCounters & counters)
{
MergeTreeData::DataPartsVector prev_parts;
std::vector<AlterConversionsPtr> prev_conversions;
std::swap(prev_parts, parts);
std::swap(prev_conversions, alter_conversions);
for (size_t i = 0; i < prev_parts.size(); ++i)
for (const auto & part_or_projection : prev_parts)
{
const auto * part = prev_parts[i]->isProjectionPart() ? prev_parts[i]->getParentPart() : prev_parts[i].get();
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
if (part_values && part_values->find(part->name) == part_values->end())
continue;
@ -1607,15 +1589,12 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
counters.num_parts_after_partition_pruner += 1;
counters.num_granules_after_partition_pruner += num_granules;
parts.push_back(prev_parts[i]);
if (!prev_conversions.empty())
alter_conversions.push_back(prev_conversions[i]);
parts.push_back(part_or_projection);
}
}
void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
@ -1628,22 +1607,17 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
{
/// process_parts prepare parts that have to be read for the query,
/// returns false if duplicated parts' UUID have been met
auto select_parts = [&] (
MergeTreeData::DataPartsVector & selected_parts,
std::vector<AlterConversionsPtr> & selected_conversions) -> bool
auto select_parts = [&](MergeTreeData::DataPartsVector & selected_parts) -> bool
{
auto ignored_part_uuids = query_context->getIgnoredPartUUIDs();
std::unordered_set<UUID> temp_part_uuids;
MergeTreeData::DataPartsVector prev_parts;
std::vector<AlterConversionsPtr> prev_conversions;
std::swap(prev_parts, selected_parts);
std::swap(prev_conversions, selected_conversions);
for (size_t i = 0; i < prev_parts.size(); ++i)
for (const auto & part_or_projection : prev_parts)
{
const auto * part = prev_parts[i]->isProjectionPart() ? prev_parts[i]->getParentPart() : prev_parts[i].get();
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
if (part_values && part_values->find(part->name) == part_values->end())
continue;
@ -1693,9 +1667,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found a part with the same UUID on the same replica.");
}
selected_parts.push_back(prev_parts[i]);
if (!prev_conversions.empty())
selected_conversions.push_back(prev_conversions[i]);
selected_parts.push_back(part_or_projection);
}
if (!temp_part_uuids.empty())
@ -1714,7 +1686,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
};
/// Process parts that have to be read for a query.
auto needs_retry = !select_parts(parts, alter_conversions);
auto needs_retry = !select_parts(parts);
/// If any duplicated part UUIDs met during the first step, try to ignore them in second pass.
/// This may happen when `prefer_localhost_replica` is set and "distributed" stage runs in the same process with "remote" stage.
@ -1725,7 +1697,7 @@ void MergeTreeDataSelectExecutor::selectPartsToReadWithUUIDFilter(
counters = PartFilterCounters();
/// Second attempt didn't help, throw an exception
if (!select_parts(parts, alter_conversions))
if (!select_parts(parts))
throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate UUIDs while processing query.");
}
}

View File

@ -40,7 +40,7 @@ public:
/// The same as read, but with specified set of parts.
QueryPlanStepPtr readFromParts(
MergeTreeData::DataPartsVector parts,
std::vector<AlterConversionsPtr> alter_conversions,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
@ -56,6 +56,7 @@ public:
/// This method is used to select best projection for table.
ReadFromMergeTree::AnalysisResultPtr estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -120,7 +121,6 @@ private:
/// as well as `max_block_number_to_read`.
static void selectPartsToRead(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const std::optional<KeyCondition> & minmax_idx_condition,
const DataTypes & minmax_columns_types,
@ -131,7 +131,6 @@ private:
/// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
static void selectPartsToReadWithUUIDFilter(
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
MergeTreeData::PinnedPartUUIDsPtr pinned_part_uuids,
const std::optional<KeyCondition> & minmax_idx_condition,
@ -175,10 +174,9 @@ public:
/// Filter parts using minmax index and partition key.
static void filterPartsByPartition(
MergeTreeData::DataPartsVector & parts,
const std::optional<PartitionPruner> & partition_pruner,
const std::optional<KeyCondition> & minmax_idx_condition,
MergeTreeData::DataPartsVector & parts,
std::vector<AlterConversionsPtr> & alter_conversions,
const std::optional<std::unordered_set<String>> & part_values,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & data,
@ -192,7 +190,6 @@ public:
/// If 'check_limits = true' it will throw exception if the amount of data exceed the limits from settings.
static RangesInDataParts filterPartsByPrimaryKeyAndSkipIndexes(
MergeTreeData::DataPartsVector && parts,
std::vector<AlterConversionsPtr> && alter_conversions,
StorageMetadataPtr metadata_snapshot,
const ContextPtr & context,
const KeyCondition & key_condition,

View File

@ -50,7 +50,7 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings)
: create_time(time(nullptr))
, commands(std::move(commands_))
, commands(std::make_shared<MutationCommands>(std::move(commands_)))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
@ -63,7 +63,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time, DateLUT::serverTimezoneInstance()) << "\n";
*out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false);
commands->writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n";
if (tid.isPrehistoric())
{
@ -116,7 +116,8 @@ void MergeTreeMutationEntry::writeCSN(CSN csn_)
}
MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_)
: disk(std::move(disk_))
: commands(std::make_shared<MutationCommands>())
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name(file_name_)
, is_temp(false)
@ -133,7 +134,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
create_time_dt.hour(), create_time_dt.minute(), create_time_dt.second());
*buf >> "commands: ";
commands.readText(*buf);
commands->readText(*buf);
*buf >> "\n";
if (buf->eof())
@ -177,7 +178,7 @@ std::shared_ptr<const IBackupEntry> MergeTreeMutationEntry::backup() const
out << "block number: " << block_number << "\n";
out << "commands: ";
commands.writeText(out, /* with_pure_metadata_commands = */ false);
commands->writeText(out, /* with_pure_metadata_commands = */ false);
out << "\n";
return std::make_shared<BackupEntryFromMemory>(out.str());

View File

@ -16,7 +16,7 @@ class IBackupEntry;
struct MergeTreeMutationEntry
{
time_t create_time = 0;
MutationCommands commands;
std::shared_ptr<MutationCommands> commands;
DiskPtr disk;
String path_prefix;

View File

@ -85,6 +85,7 @@ MergeTreeReadTask::Readers MergeTreePrefetchedReadPool::PrefetchedReaders::get()
MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -95,6 +96,7 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,
@ -103,7 +105,6 @@ MergeTreePrefetchedReadPool::MergeTreePrefetchedReadPool(
column_names_,
settings_,
context_)
, WithContext(context_)
, prefetch_threadpool(getContext()->getPrefetchThreadpool())
, log(getLogger("MergeTreePrefetchedReadPool(" + (parts_ranges.empty() ? "" : parts_ranges.front().data_part->storage.getStorageID().getNameForLogs()) + ")"))
{

View File

@ -14,11 +14,12 @@ using MergeTreeReaderPtr = std::unique_ptr<IMergeTreeReader>;
/// A class which is responsible for creating read tasks
/// which are later taken by readers via getTask method.
/// Does prefetching for the read tasks it creates.
class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase, private WithContext
class MergeTreePrefetchedReadPool : public MergeTreeReadPoolBase
{
public:
MergeTreePrefetchedReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -29,6 +29,7 @@ extern const int BAD_ARGUMENTS;
MergeTreeReadPool::MergeTreeReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -39,6 +40,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -26,6 +26,7 @@ public:
MergeTreeReadPool(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -4,9 +4,6 @@
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <cmath>
namespace DB
{
@ -17,6 +14,7 @@ namespace ErrorCodes
MergeTreeReadPoolBase::MergeTreeReadPoolBase(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -25,7 +23,9 @@ MergeTreeReadPoolBase::MergeTreeReadPoolBase(
const Names & column_names_,
const PoolSettings & pool_settings_,
const ContextPtr & context_)
: parts_ranges(std::move(parts_))
: WithContext(context_)
, parts_ranges(std::move(parts_))
, mutations_snapshot(std::move(mutations_snapshot_))
, shared_virtual_fields(std::move(shared_virtual_fields_))
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
@ -120,9 +120,9 @@ void MergeTreeReadPoolBase::fillPerPartInfos(const Settings & settings)
}
read_task_info.part_index_in_query = part_with_ranges.part_index_in_query;
read_task_info.alter_conversions = part_with_ranges.alter_conversions;
read_task_info.alter_conversions = MergeTreeData::getAlterConversionsForPart(part_with_ranges.data_part, mutations_snapshot, storage_snapshot->metadata, getContext());
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, part_with_ranges.alter_conversions);
LoadedMergeTreeDataPartInfoForReader part_info(part_with_ranges.data_part, read_task_info.alter_conversions);
read_task_info.task_columns = getReadTaskColumns(
part_info,

View File

@ -6,9 +6,11 @@
namespace DB
{
class MergeTreeReadPoolBase : public IMergeTreeReadPool
class MergeTreeReadPoolBase : public IMergeTreeReadPool, protected WithContext
{
public:
using MutationsSnapshotPtr = MergeTreeData::MutationsSnapshotPtr;
struct PoolSettings
{
size_t threads = 0;
@ -23,6 +25,7 @@ public:
MergeTreeReadPoolBase(
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -37,6 +40,7 @@ public:
protected:
/// Initialized in constructor
const RangesInDataParts parts_ranges;
const MutationsSnapshotPtr mutations_snapshot;
const VirtualFields shared_virtual_fields;
const StorageSnapshotPtr storage_snapshot;
const PrewhereInfoPtr prewhere_info;

View File

@ -12,6 +12,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
bool has_limit_below_one_block_,
MergeTreeReadType read_type_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -22,6 +23,7 @@ MergeTreeReadPoolInOrder::MergeTreeReadPoolInOrder(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -11,6 +11,7 @@ public:
bool has_limit_below_one_block_,
MergeTreeReadType read_type_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -14,6 +14,7 @@ namespace ErrorCodes
MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
ParallelReadingExtension extension_,
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -24,6 +25,7 @@ MergeTreeReadPoolParallelReplicas::MergeTreeReadPoolParallelReplicas(
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -11,6 +11,7 @@ public:
MergeTreeReadPoolParallelReplicas(
ParallelReadingExtension extension_,
RangesInDataParts && parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -13,6 +13,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
ParallelReadingExtension extension_,
CoordinationMode mode_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
@ -23,6 +24,7 @@ MergeTreeReadPoolParallelReplicasInOrder::MergeTreeReadPoolParallelReplicasInOrd
const ContextPtr & context_)
: MergeTreeReadPoolBase(
std::move(parts_),
std::move(mutations_snapshot_),
std::move(shared_virtual_fields_),
storage_snapshot_,
prewhere_info_,

View File

@ -12,6 +12,7 @@ public:
ParallelReadingExtension extension_,
CoordinationMode mode_,
RangesInDataParts parts_,
MutationsSnapshotPtr mutations_snapshot_,
VirtualFields shared_virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,

View File

@ -34,6 +34,7 @@ public:
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
@ -58,6 +59,9 @@ private:
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
/// Alter and mutation commands that are required to be applied to the part on-fly.
AlterConversionsPtr alter_conversions;
/// Columns we have to read (each Block from read will contain them)
Names columns_to_read;
@ -87,6 +91,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
std::optional<MarkRanges> mark_ranges_,
bool apply_deleted_mask,
@ -96,6 +101,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, alter_conversions(std::move(alter_conversions_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
, mark_ranges(std::move(mark_ranges_))
@ -109,8 +115,6 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
data_part->getMarksCount(), data_part->name, data_part->rows_count);
auto alter_conversions = storage.getAlterConversionsForPart(data_part);
/// Note, that we don't check setting collaborate_with_coordinator presence, because this source
/// is only used in background merges.
addTotalRowsApprox(data_part->rows_count);
@ -299,6 +303,7 @@ Pipe createMergeTreeSequentialSource(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
@ -315,7 +320,8 @@ Pipe createMergeTreeSequentialSource(
columns_to_read.emplace_back(RowExistsColumn::name);
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(type,
storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges),
storage, storage_snapshot, data_part, alter_conversions,
columns_to_read, std::move(mark_ranges),
/*apply_deleted_mask=*/ false, read_with_direct_io, prefetch);
Pipe pipe(std::move(column_part_source));
@ -346,6 +352,7 @@ public:
const MergeTreeData & storage_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
AlterConversionsPtr alter_conversions_,
Names columns_to_read_,
bool apply_deleted_mask_,
std::optional<ActionsDAG> filter_,
@ -356,6 +363,7 @@ public:
, storage(storage_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, alter_conversions(std::move(alter_conversions_))
, columns_to_read(std::move(columns_to_read_))
, apply_deleted_mask(apply_deleted_mask_)
, filter(std::move(filter_))
@ -399,6 +407,7 @@ public:
storage,
storage_snapshot,
data_part,
alter_conversions,
columns_to_read,
std::move(mark_ranges),
/*filtered_rows_count=*/ nullptr,
@ -414,6 +423,7 @@ private:
const MergeTreeData & storage;
StorageSnapshotPtr storage_snapshot;
MergeTreeData::DataPartPtr data_part;
AlterConversionsPtr alter_conversions;
Names columns_to_read;
bool apply_deleted_mask;
std::optional<ActionsDAG> filter;
@ -427,6 +437,7 @@ void createReadFromPartStep(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,
@ -434,7 +445,8 @@ void createReadFromPartStep(
LoggerPtr log)
{
auto reading = std::make_unique<ReadFromPart>(type,
storage, storage_snapshot, std::move(data_part),
storage, storage_snapshot,
std::move(data_part), std::move(alter_conversions),
std::move(columns_to_read), apply_deleted_mask,
std::move(filter), std::move(context), log);

View File

@ -21,6 +21,7 @@ Pipe createMergeTreeSequentialSource(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
std::optional<MarkRanges> mark_ranges,
std::shared_ptr<std::atomic<size_t>> filtered_rows_count,
@ -36,6 +37,7 @@ void createReadFromPartStep(
const MergeTreeData & storage,
const StorageSnapshotPtr & storage_snapshot,
MergeTreeData::DataPartPtr data_part,
AlterConversionsPtr alter_conversions,
Names columns_to_read,
bool apply_deleted_mask,
std::optional<ActionsDAG> filter,

View File

@ -115,6 +115,7 @@ static UInt64 getExistingRowsCount(const Block & block)
static void splitAndModifyMutationCommands(
MergeTreeData::DataPartPtr part,
StorageMetadataPtr metadata_snapshot,
AlterConversionsPtr alter_conversions,
const MutationCommands & commands,
MutationCommands & for_interpreter,
MutationCommands & for_file_renames,
@ -180,8 +181,6 @@ static void splitAndModifyMutationCommands(
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
@ -297,7 +296,6 @@ static void splitAndModifyMutationCommands(
}
}
auto alter_conversions = part->storage.getAlterConversionsForPart(part);
/// We don't add renames from commands, instead we take them from rename_map.
/// It's important because required renames depend not only on part's data version (i.e. mutation version)
/// but also on part's metadata version. Why we have such logic only for renames? Because all other types of alter
@ -2042,6 +2040,15 @@ bool MutateTask::prepare()
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
MergeTreeData::IMutationsSnapshot::Params params
{
.metadata_version = ctx->metadata_snapshot->getMetadataVersion(),
.min_part_metadata_version = ctx->source_part->getMetadataVersion(),
};
auto mutations_snapshot = ctx->data->getMutationsSnapshot(params);
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(ctx->source_part, mutations_snapshot, ctx->metadata_snapshot, ctx->context);
auto context_for_reading = Context::createCopy(ctx->context);
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
@ -2056,7 +2063,7 @@ bool MutateTask::prepare()
ctx->commands_for_part.emplace_back(command);
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
ctx->source_part, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading))
ctx->source_part, mutations_snapshot, ctx->metadata_snapshot, ctx->commands_for_part, context_for_reading))
{
NameSet files_to_copy_instead_of_hardlinks;
auto settings_ptr = ctx->data->getSettings();
@ -2116,8 +2123,13 @@ bool MutateTask::prepare()
context_for_reading->setSetting("read_from_filesystem_cache_if_exists_otherwise_bypass_cache", 1);
MutationHelpers::splitAndModifyMutationCommands(
ctx->source_part, ctx->metadata_snapshot,
ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames, ctx->log);
ctx->source_part,
ctx->metadata_snapshot,
alter_conversions,
ctx->commands_for_part,
ctx->for_interpreter,
ctx->for_file_renames,
ctx->log);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
@ -2131,7 +2143,8 @@ bool MutateTask::prepare()
settings.apply_deleted_mask = false;
ctx->interpreter = std::make_unique<MutationsInterpreter>(
*ctx->data, ctx->source_part, ctx->metadata_snapshot, ctx->for_interpreter,
*ctx->data, ctx->source_part, alter_conversions,
ctx->metadata_snapshot, ctx->for_interpreter,
ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();

View File

@ -42,7 +42,6 @@ struct RangesInDataPartsDescription: public std::deque<RangesInDataPartDescripti
struct RangesInDataPart
{
DataPartPtr data_part;
AlterConversionsPtr alter_conversions;
size_t part_index_in_query;
MarkRanges ranges;
MarkRanges exact_ranges;
@ -51,14 +50,13 @@ struct RangesInDataPart
RangesInDataPart(
const DataPartPtr & data_part_,
const AlterConversionsPtr & alter_conversions_,
const size_t part_index_in_query_,
const MarkRanges & ranges_ = MarkRanges{})
: data_part{data_part_}
, alter_conversions{alter_conversions_}
, part_index_in_query{part_index_in_query_}
, ranges{ranges_}
{}
{
}
RangesInDataPartDescription getDescription() const;

View File

@ -135,7 +135,6 @@ struct ReplicatedMergeTreeLogEntryData
int alter_version = -1; /// May be equal to -1, if it's normal mutation, not metadata update.
/// only ALTER METADATA command
/// NOTE It's never used
bool have_mutation = false; /// If this alter requires additional mutation step, for data update
String columns_str; /// New columns data corresponding to alter_version

View File

@ -950,7 +950,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
{
const auto commands = entry.commands;
it = mutations_by_znode.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, commands);
}
else
it = mutations_by_znode.erase(it);
@ -999,10 +999,9 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)).first->second;
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands);
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ false);
NOEXCEPT_SCOPE({
for (const auto & pair : entry->block_numbers)
{
@ -1076,7 +1075,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
}
mutations_by_znode.erase(it);
/// updateAlterConversionsMutations() will be called in updateMutations()
/// decrementMutationsCounters() will be called in updateMutations()
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
@ -1901,50 +1900,47 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
}
MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
MutationCommands ReplicatedMergeTreeQueue::MutationsSnapshot::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
int32_t part_metadata_version = part->getMetadataVersion();
int32_t metadata_version = storage.getInMemoryMetadataPtr()->getMetadataVersion();
chassert(alter_conversions_mutations >= 0);
/// NOTE: that just checking part_metadata_version is not enough, since we
/// need to check for non-metadata mutations as well.
if (alter_conversions_mutations == 0 && metadata_version == part_metadata_version)
return {};
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
return {};
Int64 part_data_version = part->info.getDataVersion();
Int64 part_metadata_version = part->getMetadataVersion();
MutationCommands result;
bool seen_all_data_mutations = false;
bool seen_all_metadata_mutations = false;
bool seen_all_data_mutations = !hasDataMutations();
bool seen_all_metadata_mutations = part_metadata_version >= params.metadata_version;
if (seen_all_data_mutations && seen_all_metadata_mutations)
return {};
auto add_to_result = [&](const ReplicatedMergeTreeMutationEntryPtr & entry)
{
for (const auto & command : entry->commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
{
if (AlterConversions::isSupportedMetadataMutation(command.type))
result.push_back(command);
else if (params.need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
result.push_back(command);
}
};
/// Here we return mutation commands for part which has bigger alter version than part metadata version.
/// Please note, we don't use getDataVersion(). It's because these alter commands are used for in-fly conversions
/// of part's metadata.
for (const auto & [mutation_version, mutation_status] : in_partition->second | std::views::reverse)
for (const auto & [mutation_version, entry] : in_partition->second | std::views::reverse)
{
if (seen_all_data_mutations && seen_all_metadata_mutations)
break;
auto & entry = mutation_status->entry;
auto alter_version = entry->alter_version;
if (alter_version != -1)
{
if (alter_version > metadata_version)
if (seen_all_metadata_mutations || alter_version > params.metadata_version)
continue;
/// We take commands with bigger metadata version
@ -1953,7 +1949,7 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
else
seen_all_metadata_mutations = true;
}
else
else if (!seen_all_data_mutations)
{
if (mutation_version > part_data_version)
add_to_result(entry);
@ -1965,6 +1961,104 @@ MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const
return result;
}
NameSet ReplicatedMergeTreeQueue::MutationsSnapshot::getAllUpdatedColumns() const
{
if (!hasDataMutations())
return {};
NameSet res;
for (const auto & [partition_id, mutations] : mutations_by_partition)
{
for (const auto & [version, entry] : mutations)
{
auto names = entry->commands.getAllUpdatedColumns();
std::move(names.begin(), names.end(), std::inserter(res, res.end()));
}
}
return res;
}
MergeTreeData::MutationsSnapshotPtr ReplicatedMergeTreeQueue::getMutationsSnapshot(const MutationsSnapshot::Params & params) const
{
std::lock_guard lock(state_mutex);
MutationsSnapshot::Info info
{
.num_data_mutations = num_data_mutations_to_apply,
.num_metadata_mutations = num_metadata_mutations_to_apply,
};
auto res = std::make_shared<MutationsSnapshot>(params, std::move(info));
bool need_data_mutations = res->hasDataMutations();
bool need_metatadata_mutations = params.min_part_metadata_version < params.metadata_version;
if (!need_data_mutations && !need_metatadata_mutations)
return res;
auto is_supported_command = [&](const auto & command)
{
if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
return true;
if (need_metatadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
return true;
return false;
};
for (const auto & [partition_id, mutations] : mutations_by_partition)
{
auto & in_partition = res->mutations_by_partition[partition_id];
bool seen_all_data_mutations = !need_data_mutations;
bool seen_all_metadata_mutations = !need_metatadata_mutations;
for (const auto & [mutation_version, status] : mutations | std::views::reverse)
{
if (seen_all_data_mutations && seen_all_metadata_mutations)
break;
auto alter_version = status->entry->alter_version;
if (alter_version != -1)
{
if (seen_all_metadata_mutations || alter_version > params.metadata_version)
continue;
/// We take commands with bigger metadata version
if (alter_version > params.min_part_metadata_version)
{
/// Copy a pointer to the whole entry to avoid extracting and copying commands.
/// Required commands will be copied later only for specific parts.
if (std::ranges::any_of(status->entry->commands, is_supported_command))
in_partition.emplace(mutation_version, status->entry);
}
else
{
seen_all_metadata_mutations = true;
}
}
else if (!seen_all_data_mutations)
{
if (!status->is_done)
{
/// Copy a pointer to the whole entry to avoid extracting and copying commands.
/// Required commands will be copied later only for specific parts.
if (std::ranges::any_of(status->entry->commands, is_supported_command))
in_partition.emplace(mutation_version, status->entry);
}
else
{
seen_all_data_mutations = true;
}
}
}
}
return res;
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version, Strings & mutation_ids) const
{
@ -2045,7 +2139,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
mutation.parts_to_do.clear();
}
updateAlterConversionsMutations(mutation.entry->commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, mutation.entry->commands);
}
else if (mutation.parts_to_do.size() == 0)
{
@ -2102,7 +2196,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, entry->commands);
}
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <cstdint>
#include <optional>
#include <Common/ActionBlocker.h>
@ -151,8 +152,11 @@ private:
/// Mapping from znode path to Mutations Status
std::map<String, MutationStatus> mutations_by_znode;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Unfinished mutations that are required for AlterConversions.
Int64 num_data_mutations_to_apply = 0;
Int64 num_metadata_mutations_to_apply = 0;
/// Partition -> (block_number -> MutationStatus)
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
/// Znode ID of the latest mutation that is done.
@ -409,10 +413,26 @@ public:
MutationCommands getMutationCommands(const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version,
Strings & mutation_ids) const;
struct MutationsSnapshot : public MergeTreeData::IMutationsSnapshot
{
public:
MutationsSnapshot() = default;
MutationsSnapshot(Params params_, Info info_) : IMutationsSnapshot(std::move(params_), std::move(info_)) {}
using Params = MergeTreeData::IMutationsSnapshot::Params;
using MutationsByPartititon = std::unordered_map<String, std::map<Int64, ReplicatedMergeTreeMutationEntryPtr>>;
MutationsByPartititon mutations_by_partition;
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
NameSet getAllUpdatedColumns() const override;
};
/// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly,
/// without actual data modification on disk.
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
MergeTreeData::MutationsSnapshotPtr getMutationsSnapshot(const MutationsSnapshot::Params & params) const;
/// Mark finished mutations as done. If the function needs to be called again at some later time
/// (because some mutations are probably done but we are not sure yet), returns true.

View File

@ -30,7 +30,7 @@ void StorageFromMergeTreeDataPart::read(
{
query_plan.addStep(MergeTreeDataSelectExecutor(storage).readFromParts(
parts,
alter_conversions,
mutations_snapshot,
column_names,
storage_snapshot,
query_info,

View File

@ -18,10 +18,12 @@ class StorageFromMergeTreeDataPart final : public IStorage
{
public:
/// Used in part mutation.
explicit StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_)
explicit StorageFromMergeTreeDataPart(
const MergeTreeData::DataPartPtr & part_,
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot_)
: IStorage(getIDFromPart(part_))
, parts({part_})
, alter_conversions({part_->storage.getAlterConversionsForPart(part_)})
, mutations_snapshot(mutations_snapshot_)
, storage(part_->storage)
, partition_id(part_->info.partition_id)
{
@ -81,7 +83,7 @@ public:
private:
const MergeTreeData::DataPartsVector parts;
const std::vector<AlterConversionsPtr> alter_conversions;
const MergeTreeData::MutationsSnapshotPtr mutations_snapshot;
const MergeTreeData & storage;
const String partition_id;
const ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr;

View File

@ -391,17 +391,9 @@ IMergeTreeDataPart::Checksums checkDataPart(
auto file_name = it->name();
if (!data_part_storage.isDirectory(file_name))
{
const bool is_projection_part = data_part->isProjectionPart();
auto remote_path = data_part_storage.getRemotePath(file_name, /* if_exists */is_projection_part);
if (remote_path.empty())
{
chassert(is_projection_part);
throw Exception(
ErrorCodes::BROKEN_PROJECTION,
"Remote path for {} does not exist for projection path. Projection {} is broken",
file_name, data_part->name);
}
cache.removePathIfExists(remote_path, FileCache::getCommonUser().user_id);
auto remote_paths = data_part_storage.getRemotePaths(file_name);
for (const auto & remote_path : remote_paths)
cache.removePathIfExists(remote_path, FileCache::getCommonUser().user_id);
}
}

View File

@ -274,4 +274,13 @@ bool MutationCommands::containBarrierCommand() const
return false;
}
NameSet MutationCommands::getAllUpdatedColumns() const
{
NameSet res;
for (const auto & command : *this)
for (const auto & [column_name, _] : command.column_to_update_expression)
res.insert(column_name);
return res;
}
}

View File

@ -92,6 +92,7 @@ public:
/// stick with other commands. Commands from one set have already been validated
/// to be executed without issues on the creation state.
bool containBarrierCommand() const;
NameSet getAllUpdatedColumns() const;
};
using MutationCommandsConstPtr = std::shared_ptr<MutationCommands>;

View File

@ -1625,9 +1625,11 @@ std::tuple<bool /* is_regexp */, ASTPtr> StorageMerge::evaluateDatabaseName(cons
return {false, ast};
}
bool StorageMerge::supportsTrivialCountOptimization(const StorageSnapshotPtr & storage_snapshot, ContextPtr ctx) const
bool StorageMerge::supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr ctx) const
{
return getFirstTable([&](const auto & table) { return !table->supportsTrivialCountOptimization(storage_snapshot, ctx); }) == nullptr;
/// Here we actually need storage snapshot of all nested tables.
/// But to avoid complexity pass nullptr to make more lightweight check in MergeTreeData.
return getFirstTable([&](const auto & table) { return !table->supportsTrivialCountOptimization(nullptr, ctx); }) == nullptr;
}
std::optional<UInt64> StorageMerge::totalRows(const Settings & settings) const

View File

@ -512,21 +512,14 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
if (txn)
txn->addMutation(shared_from_this(), mutation_id);
bool alter_conversions_mutations_updated = updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
{
std::lock_guard lock(currently_processing_in_background_mutex);
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
auto [it, inserted] = current_mutations_by_version.try_emplace(version, std::move(entry));
if (!inserted)
{
if (alter_conversions_mutations_updated)
{
--alter_conversions_mutations;
chassert(alter_conversions_mutations >= 0);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
}
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands);
}
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
@ -563,7 +556,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
updateAlterConversionsMutations(it->second.commands, alter_conversions_mutations, /* remove= */ true);
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry.commands);
}
}
else
@ -763,17 +756,15 @@ std::map<std::string, MutationCommands> StorageMergeTree::getUnfinishedMutationC
std::map<std::string, MutationCommands> result;
for (const auto & kv : current_mutations_by_version)
for (const auto & [mutation_version, entry] : current_mutations_by_version)
{
Int64 mutation_version = kv.first;
const MergeTreeMutationEntry & entry = kv.second;
const PartVersionWithName needle{mutation_version, ""};
const PartVersionWithName needle{static_cast<Int64>(mutation_version), ""};
auto versions_it = std::lower_bound(
part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator);
size_t parts_to_do = versions_it - part_versions_with_names.begin();
if (parts_to_do > 0)
result.emplace(entry.file_name, entry.commands);
result.emplace(entry.file_name, *entry.commands);
}
return result;
}
@ -806,7 +797,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
std::map<String, Int64> block_numbers_map({{"", entry.block_number}});
for (const MutationCommand & command : entry.commands)
for (const MutationCommand & command : *entry.commands)
{
WriteBufferFromOwnString buf;
formatAST(*command.ast, buf, false, true);
@ -843,20 +834,15 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
auto it = current_mutations_by_version.find(mutation_version);
if (it != current_mutations_by_version.end())
{
bool mutation_finished = true;
if (std::optional<Int64> min_version = getMinPartDataVersion())
mutation_finished = *min_version > static_cast<Int64>(mutation_version);
{
bool mutation_finished = *min_version > static_cast<Int64>(mutation_version);
if (!mutation_finished)
decrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *it->second.commands);
}
to_kill.emplace(std::move(it->second));
if (!mutation_finished)
{
const auto commands = it->second.commands;
current_mutations_by_version.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
current_mutations_by_version.erase(it);
current_mutations_by_version.erase(it);
}
}
@ -904,6 +890,8 @@ void StorageMergeTree::loadDeduplicationLog()
void StorageMergeTree::loadMutations()
{
std::lock_guard lock(currently_processing_in_background_mutex);
for (const auto & disk : getDisks())
{
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
@ -912,7 +900,7 @@ void StorageMergeTree::loadMutations()
{
MergeTreeMutationEntry entry(disk, relative_data_path, it->name());
UInt64 block_number = entry.block_number;
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size());
LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands->size());
if (!entry.tid.isPrehistoric() && !entry.csn)
{
@ -931,10 +919,11 @@ void StorageMergeTree::loadMutations()
}
}
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
auto [entry_it, inserted] = current_mutations_by_version.try_emplace(block_number, std::move(entry));
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
incrementMutationsCounters(num_data_mutations_to_apply, num_metadata_mutations_to_apply, *entry_it->second.commands);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
@ -1282,7 +1271,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
for (const auto & command : *it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
@ -1326,11 +1315,11 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
const auto & single_mutation_commands = it->second.commands;
if (single_mutation_commands.containBarrierCommand())
if (single_mutation_commands->containBarrierCommand())
{
if (commands->empty())
{
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end());
last_mutation_to_apply = it;
}
break;
@ -1338,7 +1327,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
else
{
current_ast_elements += commands_size;
commands->insert(commands->end(), single_mutation_commands.begin(), single_mutation_commands.end());
commands->insert(commands->end(), single_mutation_commands->begin(), single_mutation_commands->end());
last_mutation_to_apply = it;
}
@ -2101,9 +2090,22 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
ProfileEventsScope profile_events_scope;
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts;
String partition_id;
bool is_all = partition->as<ASTPartition>()->all;
if (is_all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
src_parts = src_data.getVisibleDataPartsVector(local_context);
}
else
{
partition_id = getPartitionIDFromQuery(partition, local_context);
src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
}
DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts;
std::vector<scope_guard> dst_parts_locks;
@ -2111,6 +2113,9 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
for (const DataPartPtr & src_part : src_parts)
{
if (is_all)
partition_id = src_part->partition.getID(src_data);
if (!canReplacePartition(src_part))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot replace partition '{}' because part '{}' has inconsistent granularity with table",
@ -2455,34 +2460,82 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
}
}
MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MutationCommands StorageMergeTree::MutationsSnapshot::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
/// NOTE: there is no need to check part metadata_version, since
/// ALTER_METADATA cannot be done asynchronously, like in
/// ReplicatedMergeTree.
chassert(alter_conversions_mutations >= 0);
if (alter_conversions_mutations == 0)
return {};
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();
MutationCommands result;
UInt64 part_data_version = part->info.getDataVersion();
for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse)
for (const auto & [mutation_version, commands] : mutations_by_version | std::views::reverse)
{
if (mutation_version <= part_data_version)
break;
for (const auto & command : entry.commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
for (const auto & command : *commands | std::views::reverse)
{
if (params.need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
result.push_back(command);
else if (AlterConversions::isSupportedMetadataMutation(command.type))
result.push_back(command);
}
}
return result;
}
NameSet StorageMergeTree::MutationsSnapshot::getAllUpdatedColumns() const
{
if (!hasDataMutations())
return {};
NameSet res;
for (const auto & [version, commands] : mutations_by_version)
{
auto names = commands->getAllUpdatedColumns();
std::move(names.begin(), names.end(), std::inserter(res, res.end()));
}
return res;
}
MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(const IMutationsSnapshot::Params & params) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
MutationsSnapshot::Info info
{
.num_data_mutations = num_data_mutations_to_apply,
.num_metadata_mutations = num_metadata_mutations_to_apply,
};
auto res = std::make_shared<MutationsSnapshot>(params, std::move(info));
bool need_data_mutations = res->hasDataMutations();
bool need_metadata_mutations = num_metadata_mutations_to_apply > 0;
if (!need_data_mutations && !need_metadata_mutations)
return res;
for (const auto & [version, entry] : current_mutations_by_version)
{
bool has_required_command = std::ranges::any_of(*entry.commands, [&](const auto & command)
{
if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
return true;
if (need_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
return true;
return false;
});
/// Copy a pointer to all commands to avoid extracting and copying them.
/// Required commands will be copied later only for specific parts.
if (has_required_command)
res->mutations_by_version.emplace(version, entry.commands);
}
return res;
}
void StorageMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded())

View File

@ -147,8 +147,10 @@ private:
DataParts currently_merging_mutating_parts;
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Unfinished mutations that are required for AlterConversions.
Int64 num_data_mutations_to_apply = 0;
Int64 num_metadata_mutations_to_apply = 0;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
@ -308,9 +310,20 @@ private:
ContextPtr context;
};
protected:
/// Collect mutations that have to be applied on the fly: currently they are only RENAME COLUMN.
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
struct MutationsSnapshot : public IMutationsSnapshot
{
MutationsSnapshot() = default;
MutationsSnapshot(Params params_, Info info_) : IMutationsSnapshot(std::move(params_), std::move(info_)) {}
using MutationsByVersion = std::map<UInt64, std::shared_ptr<const MutationCommands>>;
MutationsByVersion mutations_by_version;
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const override;
std::shared_ptr<MergeTreeData::IMutationsSnapshot> cloneEmpty() const override { return std::make_shared<MutationsSnapshot>(); }
NameSet getAllUpdatedColumns() const override;
};
MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override;
};
}

View File

@ -8033,24 +8033,77 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// First argument is true, because we possibly will add new data to current table.
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto lock2 = source_table->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
auto storage_settings_ptr = getSettings();
auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
auto metadata_snapshot = getInMemoryMetadataPtr();
const auto storage_settings_ptr = getSettings();
const auto source_metadata_snapshot = source_table->getInMemoryMetadataPtr();
const auto metadata_snapshot = getInMemoryMetadataPtr();
const MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
Stopwatch watch;
std::unordered_set<String> partitions;
if (partition->as<ASTPartition>()->all)
{
if (replace)
throw DB::Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Only support DROP/DETACH/ATTACH PARTITION ALL currently");
partitions = src_data.getAllPartitionIds();
}
else
{
partitions = std::unordered_set<String>();
partitions.emplace(getPartitionIDFromQuery(partition, query_context));
}
LOG_INFO(log, "Will try to attach {} partitions", partitions.size());
const Stopwatch watch;
ProfileEventsScope profile_events_scope;
const auto zookeeper = getZooKeeper();
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, query_context);
const bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
std::unique_ptr<ReplicatedMergeTreeLogEntryData> entries[partitions.size()];
size_t idx = 0;
for (const auto & partition_id : partitions)
{
entries[idx] = replacePartitionFromImpl(watch,
profile_events_scope,
metadata_snapshot,
src_data,
partition_id,
zookeeper,
replace,
zero_copy_enabled,
storage_settings_ptr->always_use_copy_instead_of_hardlinks,
query_context);
++idx;
}
for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
}
std::unique_ptr<ReplicatedMergeTreeLogEntryData> StorageReplicatedMergeTree::replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context)
{
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
static const String TMP_PREFIX = "tmp_replace_from_";
auto zookeeper = getZooKeeper();
std::optional<ZooKeeperMetadataTransaction> txn;
if (auto query_txn = query_context->getZooKeeperMetadataTransaction())
txn.emplace(query_txn->getZooKeeper(),
query_txn->getDatabaseZooKeeperPath(),
query_txn->isInitialQuery(),
query_txn->getTaskZooKeeperPath());
/// Retry if alter_partition_version changes
for (size_t retry = 0; retry < 1000; ++retry)
@ -8136,11 +8189,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = storage_settings_ptr->always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.copy_instead_of_hardlink = always_use_copy_instead_of_hardlinks || (zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport()),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
if (replace)
@ -8148,7 +8199,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Replace can only work on the same disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8163,7 +8214,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
/// Attach can work on another disk
auto [dst_part, part_lock] = cloneAndLoadDataPart(
src_part,
TMP_PREFIX,
TMP_PREFIX_REPLACE_PARTITION_FROM,
dst_part_info,
metadata_snapshot,
clone_params,
@ -8179,15 +8230,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
part_checksums.emplace_back(hash_hex);
}
ReplicatedMergeTreeLogEntryData entry;
auto entry = std::make_unique<ReplicatedMergeTreeLogEntryData>();
{
auto src_table_id = src_data.getStorageID();
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry.source_replica = replica_name;
entry.create_time = time(nullptr);
entry.replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
entry->type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
entry->source_replica = replica_name;
entry->create_time = time(nullptr);
entry->replace_range_entry = std::make_shared<ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry>();
auto & entry_replace = *entry.replace_range_entry;
auto & entry_replace = *entry->replace_range_entry;
entry_replace.drop_range_part_name = drop_range_fake_part_name;
entry_replace.from_database = src_table_id.database_name;
entry_replace.from_table = src_table_id.table_name;
@ -8220,7 +8271,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ephemeral_locks[i].getUnlockOp(ops);
}
if (auto txn = query_context->getZooKeeperMetadataTransaction())
if (txn)
txn->moveOpsTo(ops);
delimiting_block_lock->getUnlockOp(ops);
@ -8228,7 +8279,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
ops.emplace_back(zkutil::makeSetRequest(alter_partition_version_path, "", alter_partition_version_stat.version));
/// Just update version, because merges assignment relies on it
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry->toString(), zkutil::CreateMode::PersistentSequential));
Transaction transaction(*this, NO_TRANSACTION_RAW);
{
@ -8278,14 +8329,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*op_results.back()).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
entry->znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
lock2.reset();
lock1.reset();
/// We need to pull the REPLACE_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
// No need to block operations further, especially that in case we have to wait for mutation to finish, the intent would block
@ -8294,10 +8342,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
return entry;
}
throw Exception(
@ -9208,13 +9253,11 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
(!has_non_adaptive_index_granularity_parts && !other_replicas_fixed_granularity));
}
MutationCommands StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MergeTreeData::MutationsSnapshotPtr StorageReplicatedMergeTree::getMutationsSnapshot(const IMutationsSnapshot::Params & params) const
{
return queue.getAlterMutationCommandsForPart(part);
return queue.getMutationsSnapshot(params);
}
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
{
if (areBackgroundMovesNeeded())

View File

@ -37,6 +37,7 @@
#include <base/defines.h>
#include <Core/BackgroundSchedulePool.h>
#include <QueryPipeline/Pipe.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Parsers/SyncReplicaMode.h>
@ -932,7 +933,7 @@ private:
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override;
void startBackgroundMovesIfNeeded() override;
@ -1013,6 +1014,18 @@ private:
DataPartsVector::const_iterator it;
};
const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
std::unique_ptr<ReplicatedMergeTreeLogEntryData> replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);

View File

@ -1,33 +0,0 @@
#!/usr/bin/env bash
set -xeuo pipefail
bash /usr/local/share/scripts/init-network.sh
# tune sysctl for network performance
cat > /etc/sysctl.d/10-network-memory.conf << EOF
net.core.netdev_max_backlog=2000
net.core.rmem_max=1048576
net.core.wmem_max=1048576
net.ipv4.tcp_max_syn_backlog=1024
net.ipv4.tcp_rmem=4096 131072 16777216
net.ipv4.tcp_wmem=4096 87380 16777216
net.ipv4.tcp_mem=4096 131072 16777216
EOF
sysctl -p /etc/sysctl.d/10-network-memory.conf
mkdir /home/ubuntu/registrystorage
sed -i 's/preserve_hostname: false/preserve_hostname: true/g' /etc/cloud/cloud.cfg
REGISTRY_PROXY_USERNAME=robotclickhouse
REGISTRY_PROXY_PASSWORD=$(aws ssm get-parameter --name dockerhub_robot_password --with-decryption | jq '.Parameter.Value' -r)
docker run -d --network=host -p 5000:5000 -v /home/ubuntu/registrystorage:/var/lib/registry \
-e REGISTRY_STORAGE_CACHE='' \
-e REGISTRY_HTTP_ADDR=0.0.0.0:5000 \
-e REGISTRY_STORAGE_DELETE_ENABLED=true \
-e REGISTRY_PROXY_REMOTEURL=https://registry-1.docker.io \
-e REGISTRY_PROXY_PASSWORD="$REGISTRY_PROXY_PASSWORD" \
-e REGISTRY_PROXY_USERNAME="$REGISTRY_PROXY_USERNAME" \
--restart=always --name registry registry:2

View File

@ -1,254 +0,0 @@
#!/usr/bin/env bash
# The script is downloaded the AWS image builder Task Orchestrator and Executor (AWSTOE)
# We can't use `user data script` because cloud-init does not check the exit code
# The script is downloaded in the component named ci-infrastructure-prepare in us-east-1
# The link there must be adjusted to a particular RAW link, e.g.
# https://github.com/ClickHouse/ClickHouse/raw/653da5f00219c088af66d97a8f1ea3e35e798268/tests/ci/worker/prepare-ci-ami.sh
set -xeuo pipefail
echo "Running prepare script"
export DEBIAN_FRONTEND=noninteractive
export RUNNER_VERSION=2.317.0
export RUNNER_HOME=/home/ubuntu/actions-runner
deb_arch() {
case $(uname -m) in
x86_64 )
echo amd64;;
aarch64 )
echo arm64;;
esac
}
runner_arch() {
case $(uname -m) in
x86_64 )
echo x64;;
aarch64 )
echo arm64;;
esac
}
# We have test for cgroups, and it's broken with cgroups v2
# Ubuntu 22.04 has it enabled by default
sed -r '/GRUB_CMDLINE_LINUX=/ s/"(.*)"/"\1 systemd.unified_cgroup_hierarchy=0"/' -i /etc/default/grub
update-grub
apt-get update
apt-get install --yes --no-install-recommends \
apt-transport-https \
at \
atop \
binfmt-support \
build-essential \
ca-certificates \
curl \
gnupg \
jq \
lsb-release \
pigz \
ripgrep \
zstd \
python3-dev \
python3-pip \
qemu-user-static \
unzip \
gh
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
echo "deb [arch=$(deb_arch) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null
apt-get update
apt-get install --yes --no-install-recommends docker-ce docker-buildx-plugin docker-ce-cli containerd.io
usermod -aG docker ubuntu
# enable ipv6 in containers (fixed-cidr-v6 is some random network mask)
cat <<EOT > /etc/docker/daemon.json
{
"ipv6": true,
"fixed-cidr-v6": "2001:db8:1::/64",
"log-driver": "json-file",
"log-opts": {
"max-file": "5",
"max-size": "1000m"
},
"insecure-registries" : ["dockerhub-proxy.dockerhub-proxy-zone:5000"],
"registry-mirrors" : ["http://dockerhub-proxy.dockerhub-proxy-zone:5000"]
}
EOT
# Install azure-cli
curl -sLS https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /etc/apt/keyrings/microsoft.gpg
AZ_DIST=$(lsb_release -cs)
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ $AZ_DIST main" | tee /etc/apt/sources.list.d/azure-cli.list
apt-get update
apt-get install --yes --no-install-recommends azure-cli
# Increase the limit on number of virtual memory mappings to aviod 'Cannot mmap' error
echo "vm.max_map_count = 2097152" > /etc/sysctl.d/01-increase-map-counts.conf
# Workarond for sanitizers uncompatibility with some kernels, see https://github.com/google/sanitizers/issues/856
echo "vm.mmap_rnd_bits=28" > /etc/sysctl.d/02-vm-mmap_rnd_bits.conf
systemctl restart docker
# buildx builder is user-specific
sudo -u ubuntu docker buildx version
sudo -u ubuntu docker buildx rm default-builder || : # if it's the second attempt
sudo -u ubuntu docker buildx create --use --name default-builder
pip install boto3 pygithub requests urllib3 unidiff dohq-artifactory jwt
rm -rf $RUNNER_HOME # if it's the second attempt
mkdir -p $RUNNER_HOME && cd $RUNNER_HOME
RUNNER_ARCHIVE="actions-runner-linux-$(runner_arch)-$RUNNER_VERSION.tar.gz"
curl -O -L "https://github.com/actions/runner/releases/download/v$RUNNER_VERSION/$RUNNER_ARCHIVE"
tar xzf "./$RUNNER_ARCHIVE"
rm -f "./$RUNNER_ARCHIVE"
./bin/installdependencies.sh
chown -R ubuntu:ubuntu $RUNNER_HOME
cd /home/ubuntu
curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "awscliv2.zip"
unzip -q awscliv2.zip
./aws/install
rm -rf /home/ubuntu/awscliv2.zip /home/ubuntu/aws
# SSH keys of core team
mkdir -p /home/ubuntu/.ssh
# ~/.ssh/authorized_keys is cleaned out, so we use deprecated but working ~/.ssh/authorized_keys2
TEAM_KEYS_URL=$(aws ssm get-parameter --region us-east-1 --name team-keys-url --query 'Parameter.Value' --output=text)
curl "${TEAM_KEYS_URL}" > /home/ubuntu/.ssh/authorized_keys2
chown ubuntu: /home/ubuntu/.ssh -R
chmod 0700 /home/ubuntu/.ssh
# Download cloudwatch agent and install config for it
wget --directory-prefix=/tmp https://s3.amazonaws.com/amazoncloudwatch-agent/ubuntu/"$(deb_arch)"/latest/amazon-cloudwatch-agent.deb{,.sig}
gpg --recv-key --keyserver keyserver.ubuntu.com D58167303B789C72
gpg --verify /tmp/amazon-cloudwatch-agent.deb.sig
dpkg -i /tmp/amazon-cloudwatch-agent.deb
aws ssm get-parameter --region us-east-1 --name AmazonCloudWatch-github-runners --query 'Parameter.Value' --output text > /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json
systemctl enable amazon-cloudwatch-agent.service
echo "Install tailscale"
# Build get-authkey for tailscale
docker run --rm -v /usr/local/bin/:/host-local-bin -i golang:alpine sh -ex <<'EOF'
CGO_ENABLED=0 go install -tags tag:svc-core-ci-github tailscale.com/cmd/get-authkey@main
mv /go/bin/get-authkey /host-local-bin
EOF
# install tailscale
curl -fsSL "https://pkgs.tailscale.com/stable/ubuntu/$(lsb_release -cs).noarmor.gpg" > /usr/share/keyrings/tailscale-archive-keyring.gpg
curl -fsSL "https://pkgs.tailscale.com/stable/ubuntu/$(lsb_release -cs).tailscale-keyring.list" > /etc/apt/sources.list.d/tailscale.list
apt-get update
apt-get install tailscale --yes --no-install-recommends
# Create a common script for the instances
mkdir /usr/local/share/scripts -p
setup_cloudflare_dns() {
# Add cloudflare DNS as a fallback
# Get default gateway interface
local IFACE ETH_DNS CLOUDFLARE_NS new_dns
IFACE=$(ip --json route list | jq '.[]|select(.dst == "default").dev' --raw-output)
# `Link 2 (eth0): 172.31.0.2`
ETH_DNS=$(resolvectl dns "$IFACE") || :
CLOUDFLARE_NS=1.1.1.1
if [[ "$ETH_DNS" ]] && [[ "${ETH_DNS#*: }" != *"$CLOUDFLARE_NS"* ]]; then
# Cut the leading legend
ETH_DNS=${ETH_DNS#*: }
# shellcheck disable=SC2206
new_dns=(${ETH_DNS} "$CLOUDFLARE_NS")
resolvectl dns "$IFACE" "${new_dns[@]}"
fi
}
setup_tailscale() {
# Setup tailscale, the very first action
local TS_API_CLIENT_ID TS_API_CLIENT_SECRET TS_AUTHKEY RUNNER_TYPE
TS_API_CLIENT_ID=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-id --query 'Parameter.Value' --output text --with-decryption)
TS_API_CLIENT_SECRET=$(aws ssm get-parameter --region us-east-1 --name /tailscale/api-client-secret --query 'Parameter.Value' --output text --with-decryption)
RUNNER_TYPE=$(/usr/local/bin/aws ec2 describe-tags --filters "Name=resource-id,Values=$INSTANCE_ID" --query "Tags[?Key=='github:runner-type'].Value" --output text)
RUNNER_TYPE=${RUNNER_TYPE:-unknown}
# Clean possible garbage from the runner type
RUNNER_TYPE=${RUNNER_TYPE//[^0-9a-z]/-}
TS_AUTHKEY=$(TS_API_CLIENT_ID="$TS_API_CLIENT_ID" TS_API_CLIENT_SECRET="$TS_API_CLIENT_SECRET" \
get-authkey -tags tag:svc-core-ci-github -ephemeral)
tailscale up --ssh --auth-key="$TS_AUTHKEY" --hostname="ci-runner-$RUNNER_TYPE-$INSTANCE_ID"
}
cat > /usr/local/share/scripts/init-network.sh << EOF
!/usr/bin/env bash
$(declare -f setup_cloudflare_dns)
$(declare -f setup_tailscale)
# If the script is sourced, it will return now and won't execute functions
return 0 &>/dev/null || :
echo Setup Cloudflare DNS
setup_cloudflare_dns
echo Setup Tailscale VPN
setup_tailscale
EOF
chmod +x /usr/local/share/scripts/init-network.sh
# The following line is used in aws TOE check.
touch /var/tmp/clickhouse-ci-ami.success
# END OF THE SCRIPT
# TOE (Task Orchestrator and Executor) description
# name: CIInfrastructurePrepare
# description: installs the infrastructure for ClickHouse CI runners
# schemaVersion: 1.0
#
# phases:
# - name: build
# steps:
# - name: DownloadRemoteScript
# maxAttempts: 3
# action: WebDownload
# onFailure: Abort
# inputs:
# - source: https://github.com/ClickHouse/ClickHouse/raw/653da5f00219c088af66d97a8f1ea3e35e798268/tests/ci/worker/prepare-ci-ami.sh
# destination: /tmp/prepare-ci-ami.sh
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - bash -x '{{build.DownloadRemoteScript.inputs[0].destination}}'
#
#
# - name: validate
# steps:
# - name: RunScript
# maxAttempts: 3
# action: ExecuteBash
# onFailure: Abort
# inputs:
# commands:
# - ls /var/tmp/clickhouse-ci-ami.success
# - name: Cleanup
# action: DeleteFile
# onFailure: Abort
# maxAttempts: 3
# inputs:
# - path: /var/tmp/clickhouse-ci-ami.success

View File

@ -10,11 +10,12 @@ REPLACE recursive
4 8
1
ATTACH FROM
5 8
6 8
10 12
OPTIMIZE
5 8 5
5 8 3
10 12 9
10 12 5
After restart
5 8
10 12
DETACH+ATTACH PARTITION
3 4
7 7

View File

@ -53,12 +53,16 @@ DROP TABLE src;
CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;
INSERT INTO src VALUES (1, '0', 1);
INSERT INTO src VALUES (1, '1', 1);
INSERT INTO src VALUES (2, '2', 1);
INSERT INTO src VALUES (3, '3', 1);
SYSTEM STOP MERGES dst;
INSERT INTO dst VALUES (1, '1', 2);
INSERT INTO dst VALUES (1, '1', 2), (1, '2', 0);
ALTER TABLE dst ATTACH PARTITION 1 FROM src;
SELECT count(), sum(d) FROM dst;
ALTER TABLE dst ATTACH PARTITION ALL FROM src;
SELECT count(), sum(d) FROM dst;
SELECT 'OPTIMIZE';
SELECT count(), sum(d), uniqExact(_part) FROM dst;

View File

@ -16,6 +16,7 @@ REPLACE recursive
ATTACH FROM
5 8
5 8
7 12
REPLACE with fetch
4 6
4 6

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: zookeeper, no-object-storage
# Tags: zookeeper, no-object-storage, long
# Because REPLACE PARTITION does not forces immediate removal of replaced data parts from local filesystem
# (it tries to do it as quick as possible, but it still performed in separate thread asynchronously)
@ -82,6 +82,8 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE src;"
$CLICKHOUSE_CLIENT --query="CREATE TABLE src (p UInt64, k String, d UInt64) ENGINE = MergeTree PARTITION BY p ORDER BY k;"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (3, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (4, '1', 2);"
$CLICKHOUSE_CLIENT --query="INSERT INTO dst_r2 VALUES (1, '1', 2);"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION 1 FROM src;"
@ -90,6 +92,13 @@ $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r1;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 ATTACH PARTITION ALL FROM src;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 3;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
query_with_retry "ALTER TABLE dst_r2 DROP PARTITION 4;"
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst_r2;"
$CLICKHOUSE_CLIENT --query="SELECT 'REPLACE with fetch';"
$CLICKHOUSE_CLIENT --query="DROP TABLE src;"

View File

@ -4,3 +4,39 @@ It is possible to create parts with different Array vector sizes but there will
Correctness of index with > 1 mark
1 [1,0] 0
9000 [9000,0] 0
Issue #69085: Reference vector computed by a subquery
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
ReadFromMergeTree (default.tab)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 4/4
Skip
Name: idx
Description: vector_similarity GRANULARITY 2
Parts: 1/1
Granules: 2/4
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
ReadFromMergeTree (default.tab)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 4/4
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
ReadFromMergeTree (default.tab)
Indexes:
PrimaryKey
Condition: true
Parts: 1/1
Granules: 4/4

View File

@ -53,3 +53,50 @@ ORDER BY L2Distance(vec, reference_vec)
LIMIT 1;
DROP TABLE tab;
SELECT 'Issue #69085: Reference vector computed by a subquery';
CREATE TABLE tab(id Int32, vec Array(Float32), INDEX idx vec TYPE vector_similarity('hnsw', 'cosineDistance', 'f16', 0, 0, 0) GRANULARITY 2) ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 3;
INSERT INTO tab VALUES (0, [4.6, 2.3]), (1, [2.0, 3.2]), (2, [4.2, 3.4]), (3, [5.3, 2.9]), (4, [2.4, 5.2]), (5, [5.3, 2.3]), (6, [1.0, 9.3]), (7, [5.5, 4.7]), (8, [6.4, 3.5]), (9, [5.3, 2.5]), (10, [6.4, 3.4]), (11, [6.4, 3.2]);
-- works
EXPLAIN indexes = 1
WITH [0., 2.] AS reference_vec
SELECT
id,
vec,
cosineDistance(vec, reference_vec) AS distance
FROM tab
ORDER BY distance
LIMIT 1
SETTINGS enable_analyzer = 0;
-- does not work
EXPLAIN indexes = 1
WITH (
SELECT vec
FROM tab
LIMIT 1
) AS reference_vec
SELECT
id,
vec,
cosineDistance(vec, reference_vec) AS distance
FROM tab
ORDER BY distance
LIMIT 1
SETTINGS enable_analyzer = 0;
-- does not work as well
EXPLAIN indexes = 1
WITH (
SELECT [0., 2.]
) AS reference_vec
SELECT
id,
vec,
cosineDistance(vec, reference_vec) AS distance
FROM tab
ORDER BY distance
LIMIT 1
SETTINGS enable_analyzer = 0;