Merge branch 'master' into ci-libfuzzer-adjustments

This commit is contained in:
Yakov Olkhovskiy 2024-11-25 02:18:42 +00:00
commit cd89353c09
5 changed files with 89 additions and 26 deletions

View File

@ -343,12 +343,6 @@ bool MutationsInterpreter::Source::supportsLightweightDelete() const
return storage->supportsLightweightDelete(); return storage->supportsLightweightDelete();
} }
bool MutationsInterpreter::Source::hasLightweightDeleteMask() const
{
return part && part->hasLightweightDelete();
}
bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
{ {
return data && (*data->getSettings())[MergeTreeSetting::materialize_ttl_recalculate_only]; return data && (*data->getSettings())[MergeTreeSetting::materialize_ttl_recalculate_only];
@ -402,6 +396,8 @@ MutationsInterpreter::MutationsInterpreter(
"Cannot execute mutation for {}. Mutation should be applied to every part separately.", "Cannot execute mutation for {}. Mutation should be applied to every part separately.",
source.getStorage()->getName()); source.getStorage()->getName());
} }
prepare(!settings.can_execute);
} }
MutationsInterpreter::MutationsInterpreter( MutationsInterpreter::MutationsInterpreter(
@ -414,10 +410,21 @@ MutationsInterpreter::MutationsInterpreter(
ContextPtr context_, ContextPtr context_,
Settings settings_) Settings settings_)
: MutationsInterpreter( : MutationsInterpreter(
Source(storage_, std::move(source_part_), std::move(alter_conversions_)), Source(storage_, source_part_, std::move(alter_conversions_)),
std::move(metadata_snapshot_), std::move(commands_), std::move(metadata_snapshot_), std::move(commands_),
std::move(available_columns_), std::move(context_), std::move(settings_)) std::move(available_columns_), std::move(context_), std::move(settings_))
{ {
const auto & part_columns = source_part_->getColumnsDescription();
auto persistent_virtuals = storage_.getVirtualsPtr()->getNamesAndTypesList(VirtualsKind::Persistent);
NameSet available_columns_set(available_columns.begin(), available_columns.end());
for (const auto & column : persistent_virtuals)
{
if (part_columns.has(column.name) && !available_columns_set.contains(column.name))
available_columns.push_back(column.name);
}
prepare(!settings.can_execute);
} }
MutationsInterpreter::MutationsInterpreter( MutationsInterpreter::MutationsInterpreter(
@ -442,8 +449,6 @@ MutationsInterpreter::MutationsInterpreter(
LOG_TEST(logger, "Will use old analyzer to prepare mutation"); LOG_TEST(logger, "Will use old analyzer to prepare mutation");
} }
context = std::move(new_context); context = std::move(new_context);
prepare(!settings.can_execute);
} }
static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot) static NameSet getKeyColumns(const MutationsInterpreter::Source & source, const StorageMetadataPtr & metadata_snapshot)
@ -579,13 +584,6 @@ void MutationsInterpreter::prepare(bool dry_run)
auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns); auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns);
NameSet available_columns_set(available_columns.begin(), available_columns.end()); NameSet available_columns_set(available_columns.begin(), available_columns.end());
/// Add _row_exists column if it is physically present in the part
if (source.hasLightweightDeleteMask())
{
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
available_columns_set.insert(RowExistsColumn::name);
}
NameSet updated_columns; NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly(); bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
@ -599,9 +597,15 @@ void MutationsInterpreter::prepare(bool dry_run)
for (const auto & [name, _] : command.column_to_update_expression) for (const auto & [name, _] : command.column_to_update_expression)
{ {
if (!available_columns_set.contains(name) && name != RowExistsColumn::name) if (name == RowExistsColumn::name)
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, {
"Column {} is updated but not requested to read", name); if (available_columns_set.emplace(name).second)
available_columns.push_back(name);
}
else if (!available_columns_set.contains(name))
{
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Column {} is updated but not requested to read", name);
}
updated_columns.insert(name); updated_columns.insert(name);
} }
@ -1070,10 +1074,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns); auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns);
/// Add _row_exists column if it is present in the part
if (source.hasLightweightDeleteMask() || deleted_mask_updated)
all_columns.emplace_back(RowExistsColumn::name, RowExistsColumn::type);
bool has_filters = false; bool has_filters = false;
/// Next, for each stage calculate columns changed by this and previous stages. /// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i) for (size_t i = 0; i < prepared_stages.size(); ++i)

View File

@ -122,7 +122,6 @@ public:
const MergeTreeData * getMergeTreeData() const; const MergeTreeData * getMergeTreeData() const;
bool supportsLightweightDelete() const; bool supportsLightweightDelete() const;
bool hasLightweightDeleteMask() const;
bool materializeTTLRecalculateOnly() const; bool materializeTTLRecalculateOnly() const;
bool hasSecondaryIndex(const String & name) const; bool hasSecondaryIndex(const String & name) const;
bool hasProjection(const String & name) const; bool hasProjection(const String & name) const;

View File

@ -18,7 +18,7 @@ class StorageFromMergeTreeDataPart final : public IStorage
{ {
public: public:
/// Used in part mutation. /// Used in part mutation.
explicit StorageFromMergeTreeDataPart( StorageFromMergeTreeDataPart(
const MergeTreeData::DataPartPtr & part_, const MergeTreeData::DataPartPtr & part_,
const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot_) const MergeTreeData::MutationsSnapshotPtr & mutations_snapshot_)
: IStorage(getIDFromPart(part_)) : IStorage(getIDFromPart(part_))
@ -32,10 +32,13 @@ public:
} }
/// Used in queries with projection. /// Used in queries with projection.
StorageFromMergeTreeDataPart(const MergeTreeData & storage_, ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr_) StorageFromMergeTreeDataPart(
const MergeTreeData & storage_,
ReadFromMergeTree::AnalysisResultPtr analysis_result_ptr_)
: IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_) : IStorage(storage_.getStorageID()), storage(storage_), analysis_result_ptr(analysis_result_ptr_)
{ {
setInMemoryMetadata(storage.getInMemoryMetadata()); setInMemoryMetadata(storage.getInMemoryMetadata());
setVirtuals(*storage.getVirtualsPtr());
} }
String getName() const override { return "FromMergeTreeDataPart"; } String getName() const override { return "FromMergeTreeDataPart"; }

View File

@ -0,0 +1,20 @@
8 44
DELETE WHERE x < 2 1
_block_number 1
ts 1
x 1
8 44
DELETE WHERE x < 2 1
_block_number 1
ts 1
x 1
8 44
DELETE WHERE x < 2 1
_block_number 1
ts 1
x 1
8 44
DELETE WHERE x < 2 1
_block_number 1
ts 1
x 1

View File

@ -0,0 +1,41 @@
DROP TABLE IF EXISTS t_block_number_delete sync;
SET mutations_sync = 2;
CREATE TABLE t_block_number_delete (x UInt32, ts DateTime) ENGINE = MergeTree ORDER BY x SETTINGS enable_block_number_column = 1, min_bytes_for_wide_part = 1;
INSERT INTO t_block_number_delete SELECT number, now() - INTERVAL number minute from numbers(10);
OPTIMIZE TABLE t_block_number_delete final;
ALTER TABLE t_block_number_delete DELETE WHERE x < 2;
SELECT count(), sum(x) FROM t_block_number_delete;
SELECT command, is_done, latest_fail_reason FROM system.mutations WHERE database = currentDatabase() AND table = 't_block_number_delete';
SELECT column, count() FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_block_number_delete' AND active GROUP BY column ORDER BY column;
DETACH TABLE t_block_number_delete;
ATTACH TABLE t_block_number_delete;
SELECT count(), sum(x) FROM t_block_number_delete;
SELECT command, is_done, latest_fail_reason FROM system.mutations WHERE database = currentDatabase() AND table = 't_block_number_delete';
SELECT column, count() FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_block_number_delete' AND active GROUP BY column ORDER BY column;
DROP TABLE t_block_number_delete;
CREATE TABLE t_block_number_delete (x UInt32, ts DateTime) ENGINE = MergeTree ORDER BY x SETTINGS enable_block_number_column = 1, min_bytes_for_wide_part = '10G';
INSERT INTO t_block_number_delete SELECT number, now() - INTERVAL number minute from numbers(10);
OPTIMIZE TABLE t_block_number_delete final;
ALTER TABLE t_block_number_delete DELETE WHERE x < 2;
SELECT count(), sum(x) FROM t_block_number_delete;
SELECT command, is_done, latest_fail_reason FROM system.mutations WHERE database = currentDatabase() AND table = 't_block_number_delete';
SELECT column, count() FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_block_number_delete' AND active GROUP BY column ORDER BY column;
DETACH TABLE t_block_number_delete;
ATTACH TABLE t_block_number_delete;
SELECT count(), sum(x) FROM t_block_number_delete;
SELECT command, is_done, latest_fail_reason FROM system.mutations WHERE database = currentDatabase() AND table = 't_block_number_delete';
SELECT column, count() FROM system.parts_columns WHERE database = currentDatabase() AND table = 't_block_number_delete' AND active GROUP BY column ORDER BY column;
DROP TABLE t_block_number_delete;