Code changes based on review opinions. Make ordinary single alter delete lightwight by default

This commit is contained in:
jianmei zhang 2022-07-06 18:29:29 +08:00
parent 6e6f77ef8a
commit 780cdfb8f0
22 changed files with 102 additions and 44 deletions

View File

@ -458,6 +458,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(Bool, optimize_respect_aliases, true, "If it is set to true, it will respect aliases in WHERE/GROUP BY/ORDER BY, that will help with partition pruning/secondary indexes/optimize_aggregation_in_order/optimize_read_in_order/optimize_trivial_count", 0) \
M(UInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
M(Bool, allow_experimental_lightweight_delete, false, "Enable lightweight DELETE mutations for mergetree tables. Work in progress", 0) \
M(Bool, lightweight_delete_mutation, true, "Enable to make ordinary ALTER DELETE queries lightweight for mergetree tables", 0) \
M(Bool, optimize_move_functions_out_of_any, false, "Move functions out of aggregate functions 'any', 'anyLast'.", 0) \
M(Bool, optimize_normalize_count_variants, true, "Rewrite aggregate functions that semantically equals to count() as count().", 0) \
M(Bool, optimize_injective_functions_inside_uniq, true, "Delete injective functions of one argument inside uniq*() functions.", 0) \

View File

@ -144,7 +144,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (!mutation_commands.empty())
{
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false, false).validate();
table->mutate(mutation_commands, getContext());
}

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int TABLE_IS_READ_ONLY;
extern const int SUPPORT_IS_DISABLED;
}
@ -30,6 +31,9 @@ InterpreterDeleteQuery::InterpreterDeleteQuery(const ASTPtr & query_ptr_, Contex
BlockIO InterpreterDeleteQuery::execute()
{
if (!getContext()->getSettingsRef().allow_experimental_lightweight_delete)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "Lightweight delete mutate is experimental. Set `allow_experimental_lightweight_delete` setting to enable it");
FunctionNameNormalizer().visit(query_ptr.get());
const ASTDeleteQuery & delete_query = query_ptr->as<ASTDeleteQuery &>();
auto table_id = getContext()->resolveStorageID(delete_query, Context::ResolveOrdinary);
@ -60,8 +64,7 @@ BlockIO InterpreterDeleteQuery::execute()
auto table_lock = table->lockForShare(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Currently do similar as alter table delete.
/// TODO: Mark this delete as lightweight.
/// Convert to MutationCommand
MutationCommands mutation_commands;
MutationCommand mut_command;
@ -76,12 +79,9 @@ BlockIO InterpreterDeleteQuery::execute()
mutation_commands.emplace_back(mut_command);
if (!mutation_commands.empty())
{
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
storage_merge_tree->mutate(mutation_commands, getContext(), MutationType::Lightweight);
}
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false, false).validate();
storage_merge_tree->mutate(mutation_commands, getContext(), MutationType::Lightweight);
return {};
}

View File

@ -1936,7 +1936,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
&& !settings.empty_result_for_aggregation_by_empty_set
&& storage
&& storage->getName() != "MaterializedMySQL"
&& !storage->hasLightweightDelete()
&& !storage->hasLightweightDeletedMask()
&& !row_policy_filter
&& processing_stage == QueryProcessingStage::FetchColumns
&& query_analyzer->hasAggregation()

View File

@ -44,7 +44,7 @@ public:
MutationCommands commands_,
ContextPtr context_,
bool can_execute_,
bool is_lightweight_ = false);
bool is_lightweight_);
void validate();
@ -79,7 +79,7 @@ public:
MutationKind::MutationKindEnum getMutationKind() const { return mutation_kind.mutation_kind; }
void SetSkipDeletedMask(bool skip) { skip_deleted_mask = skip; }
void setSkipDeletedMask(bool skip) { skip_deleted_mask = skip; }
private:
ASTPtr prepare(bool dry_run);

View File

@ -235,8 +235,8 @@ public:
/// Returns true if the storage supports backup/restore for specific partitions.
virtual bool supportsBackupPartition() const { return false; }
/// Return true if there are lightweight parts.
virtual bool hasLightweightDelete() const { return false; }
/// Return true if there are at least one part containing lightweight deleted mask.
virtual bool hasLightweightDeletedMask() const { return false; }
private:

View File

@ -665,7 +665,7 @@ void DataPartStorageOnDisk::loadDeletedRowsMask(MergeTreeDataPartDeletedMask & d
}
}
void DataPartStorageOnDisk::writeDeletedRowsMask(MergeTreeDataPartDeletedMask & deleted_mask) const
void DataPartStorageOnDisk::writeDeletedRowsMask(const MergeTreeDataPartDeletedMask & deleted_mask) const
{
const String final_path = fs::path(getRelativePath()) / deleted_mask.name;
const String tmp_path = final_path + ".tmp";

View File

@ -87,7 +87,7 @@ public:
bool shallParticipateInMerges(const IStoragePolicy &) const override;
void loadDeletedRowsMask(MergeTreeDataPartDeletedMask & deleted_mask) const override;
void writeDeletedRowsMask(MergeTreeDataPartDeletedMask & deleted_mask) const override;
void writeDeletedRowsMask(const MergeTreeDataPartDeletedMask & deleted_mask) const override;
void backup(
TemporaryFilesOnDisks & temp_dirs,

View File

@ -171,7 +171,7 @@ public:
virtual bool shallParticipateInMerges(const IStoragePolicy &) const { return true; }
virtual void loadDeletedRowsMask(MergeTreeDataPartDeletedMask & deleted_mask) const = 0;
virtual void writeDeletedRowsMask(MergeTreeDataPartDeletedMask & deleted_mask) const = 0;
virtual void writeDeletedRowsMask(const MergeTreeDataPartDeletedMask & deleted_mask) const = 0;
/// Create a backup of a data part.
/// This method adds a new entry to backup_entries.

View File

@ -791,6 +791,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
if (data_part_storage->exists(TXN_VERSION_METADATA_FILE_NAME))
result.emplace(TXN_VERSION_METADATA_FILE_NAME);
if (data_part_storage->exists(DELETED_ROWS_MARK_FILE_NAME))
result.emplace(DELETED_ROWS_MARK_FILE_NAME);
return result;
}
@ -1216,8 +1219,6 @@ void IMergeTreeDataPart::loadDeletedMask()
if (data_part_storage->exists(deleted_mask.name))
{
has_lightweight_delete = true;
data_part_storage->loadDeletedRowsMask(deleted_mask);
if (deleted_mask.getDeletedRows().size() != rows_count)
@ -1232,7 +1233,6 @@ void IMergeTreeDataPart::loadDeletedMask()
void IMergeTreeDataPart::writeDeletedMask(MergeTreeDataPartDeletedMask::DeletedRows new_mask)
{
deleted_mask.setDeletedRows(new_mask);
has_lightweight_delete = true;
data_part_storage->writeDeletedRowsMask(deleted_mask);
}

View File

@ -328,9 +328,6 @@ public:
mutable VersionMetadata version;
/// True if the part has deleted_rows_mask.bin file used for lightweight delete.
bool has_lightweight_delete = false;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;
@ -408,6 +405,8 @@ public:
static inline constexpr auto TXN_VERSION_METADATA_FILE_NAME = "txn_version.txt";
static inline constexpr auto DELETED_ROWS_MARK_FILE_NAME = "deleted_rows_mask.bin";
/// One of part files which is used to check how many references (I'd like
/// to say hardlinks, but it will confuse even more) we have for the part
/// for zero copy replication. Sadly it's very complex.
@ -460,8 +459,8 @@ public:
/// Check metadata in cache is consistent with actual metadata on disk(if use_metadata_cache is true)
std::unordered_map<String, uint128> checkMetadata() const;
/// True if here is light weight bitmap file in part.
bool hasLightweightDelete() const { return has_lightweight_delete; }
/// True if here is lightweight deleted mask file in part.
bool hasLightweightDelete() const { return deleted_mask.getDeletedRows().size() > 0; }
const MergeTreeDataPartDeletedMask& getDeletedMask() const { return deleted_mask; }
void writeDeletedMask(MergeTreeDataPartDeletedMask::DeletedRows new_mask);

View File

@ -17,7 +17,7 @@ struct MergeTreeDataPartDeletedMask
explicit MergeTreeDataPartDeletedMask();
using DeletedRows = ColumnUInt8::Ptr;
std::string name = "deleted_rows_mask.bin";
const std::string name = "deleted_rows_mask.bin";
const ColumnUInt8 & getDeletedRows() const;
void setDeletedRows(DeletedRows new_rows);

View File

@ -47,14 +47,14 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings, MutationType type_)
: type(type_)
, create_time(time(nullptr))
: create_time(time(nullptr))
, commands(std::move(commands_))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
, is_temp(true)
, tid(tid_)
, type(type_)
{
try
{

View File

@ -11,15 +11,15 @@ namespace DB
{
class IBackupEntry;
/// Type of Mutate. Used to control different mutates during mutates
/// assignment. Also allows to apply special logic during mutate process
/// Stored in FutureMergedMutatedPart and MergeTreeMutationEntry.
enum class MutationType { Ordinary, Lightweight };
/// A mutation entry for non-replicated MergeTree storage engines.
/// Stores information about mutation in file mutation_*.txt.
struct MergeTreeMutationEntry
{
/// Type of mutation, used for lightweight delete.
MutationType type;
time_t create_time = 0;
MutationCommands commands;
@ -41,6 +41,9 @@ struct MergeTreeMutationEntry
/// or UnknownCSN if it's not committed (yet) or RolledBackCSN if it's rolled back or PrehistoricCSN if there is no transaction.
CSN csn = Tx::UnknownCSN;
/// Type of mutation, used for lightweight delete.
MutationType type;
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings, MutationType type_);

View File

@ -432,6 +432,10 @@ NameSet collectFilesToSkip(
{
NameSet files_to_skip = source_part->getFileNamesWithoutChecksums();
/// Remove deleted rows mask file name to create hard link for it when mutate some columns.
if (files_to_skip.contains(IMergeTreeDataPart::DELETED_ROWS_MARK_FILE_NAME))
files_to_skip.erase(IMergeTreeDataPart::DELETED_ROWS_MARK_FILE_NAME);
/// Skip updated files
for (const auto & entry : updated_header)
{
@ -1355,6 +1359,9 @@ private:
std::unique_ptr<PartMergerWriter> part_merger_writer_task{nullptr};
};
/// LightweightDeleteTask works for lightweight delete mutate.
/// The MutationsInterpreter returns a simple select like "select _part_offset where predicates".
/// The prepare() and execute() has special logics for LWD mutate.
class LightweightDeleteTask : public IExecutableTask
{
public:
@ -1665,7 +1672,7 @@ bool MutateTask::prepare()
/// Skip to apply deleted mask when reading for MutateSomePartColumns.
need_mutate_all_columns = need_mutate_all_columns || (ctx->mutation_kind == MutationsInterpreter::MutationKind::MUTATE_OTHER && ctx->interpreter->isAffectingAllColumns());
if (!need_mutate_all_columns && ctx->source_part->hasLightweightDelete() && !ctx->is_lightweight_mutation)
ctx->interpreter->SetSkipDeletedMask(true);
ctx->interpreter->setSkipDeletedMask(true);
ctx->mutating_pipeline_builder = ctx->interpreter->execute();
ctx->updated_header = ctx->interpreter->getUpdatedHeader();
ctx->progress_callback = MergeProgressCallback((*ctx->mutate_entry)->ptr(), ctx->watch_prev_elapsed, *ctx->stage_progress);
@ -1733,9 +1740,6 @@ bool MutateTask::prepare()
else if (ctx->is_lightweight_mutation)
{
ctx->files_to_skip = ctx->source_part->getFileNamesWithoutChecksums();
/// Skip to create hardlink for deleted_rows_mask.bin
if (ctx->source_part->hasLightweightDelete())
ctx->files_to_skip.insert("deleted_rows_mask.bin");
/// We will modify or create only deleted_row_mask for lightweight delete. Other columns and key values are copied as-is.
task = std::make_unique<LightweightDeleteTask>(ctx);

View File

@ -1920,7 +1920,7 @@ std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatu
formatAST(*command.ast, buf, false, true);
result.push_back(MergeTreeMutationStatus
{
MutationType::Ordinary,
MutationType::Ordinary, /// TODO: ReplicatedMergeTree supports lightweight delete.
entry.znode_name,
buf.str(),
entry.create_time,

View File

@ -123,7 +123,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
// New scope controls lifetime of pipeline.
{
auto storage_ptr = DatabaseCatalog::instance().getTable(getStorageID(), context);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, context, true, false);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

View File

@ -318,7 +318,7 @@ void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context
new_context->setSetting("max_streams_to_max_threads_ratio", 1);
new_context->setSetting("max_threads", 1);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, new_context, true);
auto interpreter = std::make_unique<MutationsInterpreter>(storage_ptr, metadata_snapshot, commands, new_context, true, false);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);

View File

@ -332,7 +332,7 @@ void StorageMergeTree::alter(
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
if (!maybe_mutation_commands.empty())
mutation_version = startMutation(maybe_mutation_commands, local_context);
mutation_version = startMutation(maybe_mutation_commands, local_context, MutationType::Ordinary);
}
/// Always execute required mutations synchronously, because alters
@ -555,7 +555,12 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
mutate(commands, query_context, MutationType::Ordinary);
/// Make ordinary ALTER DELETE queries lightweight to check all tests.
if (query_context->getSettingsRef().lightweight_delete_mutation
&& commands.size() == 1 && commands.begin()->type == MutationCommand::DELETE)
mutate(commands, query_context, MutationType::Lightweight);
else
mutate(commands, query_context, MutationType::Ordinary);
}
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context, MutationType type)
@ -569,7 +574,7 @@ void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr quer
waitForMutation(version);
}
bool StorageMergeTree::hasLightweightDelete() const
bool StorageMergeTree::hasLightweightDeletedMask() const
{
return has_lightweight_delete_parts.load(std::memory_order_relaxed);
}
@ -1065,7 +1070,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, false);
shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, false, false);
commands_size += interpreter.evaluateCommandsSize();
}
catch (...)

View File

@ -89,7 +89,7 @@ public:
/// Support lightweight delete.
void mutate(const MutationCommands & commands, ContextPtr context, MutationType type);
bool hasLightweightDelete() const override;
bool hasLightweightDeletedMask() const override;
/// Return introspection information about currently processing or recently processed mutations.
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
@ -184,7 +184,7 @@ private:
/// Allocate block number for new mutation, write mutation to disk
/// and into in-memory structures. Wake up merge-mutation task.
Int64 startMutation(const MutationCommands & commands, ContextPtr query_context, MutationType type = MutationType::Ordinary);
Int64 startMutation(const MutationCommands & commands, ContextPtr query_context, MutationType type);
/// Wait until mutation with version will finish mutation for all parts
void waitForMutation(Int64 version);
void waitForMutation(const String & mutation_id) override;

View File

@ -1,7 +1,13 @@
99
1
95
1
0
1
-----lightweight mutation type-----
1
1
1
1 DELETE WHERE (c % 5) = 1 1
1 DELETE WHERE c = 4 1
0 MATERIALIZE INDEX i_c 1
@ -27,5 +33,7 @@ t_light 2 2_3_3_1_10 2
t_light 3 3_4_4_1_10 2
t_light 4 4_5_5_1_10 1
-----Test lightweight delete in multi blocks-----
1
1
1000 -2
1005 -2

View File

@ -5,19 +5,32 @@ CREATE TABLE merge_table_standard_delete(id Int32, name String) ENGINE = MergeTr
INSERT INTO merge_table_standard_delete select number, toString(number) from numbers(100);
SET mutations_sync = 1;
SET allow_experimental_lightweight_delete = 1;
DELETE FROM merge_table_standard_delete WHERE id = 10;
SELECT COUNT() FROM merge_table_standard_delete;
DETACH TABLE merge_table_standard_delete;
ATTACH TABLE merge_table_standard_delete;
CHECK TABLE merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE name IN ('1','2','3','4');
SELECT COUNT() FROM merge_table_standard_delete;
DETACH TABLE merge_table_standard_delete;
ATTACH TABLE merge_table_standard_delete;
CHECK TABLE merge_table_standard_delete;
DELETE FROM merge_table_standard_delete WHERE 1;
SELECT COUNT() FROM merge_table_standard_delete;
DETACH TABLE merge_table_standard_delete;
ATTACH TABLE merge_table_standard_delete;
CHECK TABLE merge_table_standard_delete;
DROP TABLE merge_table_standard_delete;
drop table if exists t_light;
@ -27,11 +40,25 @@ INSERT INTO t_light SELECT number, number, number FROM numbers(10);
SELECT '-----lightweight mutation type-----';
DELETE FROM t_light WHERE c%5=1;
DETACH TABLE t_light;
ATTACH TABLE t_light;
CHECK TABLE t_light;
DELETE FROM t_light WHERE c=4;
DETACH TABLE t_light;
ATTACH TABLE t_light;
CHECK TABLE t_light;
alter table t_light MATERIALIZE INDEX i_c;
alter table t_light update b=-1 where a<3;
alter table t_light drop index i_c;
DETACH TABLE t_light;
ATTACH TABLE t_light;
CHECK TABLE t_light;
SELECT is_lightweight, command, is_done FROM system.mutations WHERE database = currentDatabase() AND table = 't_light';
SELECT '-----Check that select and merge with lightweight delete.-----';
@ -52,8 +79,18 @@ CREATE TABLE t_large(a UInt32, b int) ENGINE=MergeTree order BY a settings min_b
INSERT INTO t_large SELECT number + 1, number + 1 FROM numbers(100000);
DELETE FROM t_large WHERE a = 50000;
DETACH TABLE t_large;
ATTACH TABLE t_large;
CHECK TABLE t_large;
ALTER TABLE t_large UPDATE b = -2 WHERE a between 1000 and 1005;
ALTER TABLE t_large DELETE WHERE a=1;
DETACH TABLE t_large;
ATTACH TABLE t_large;
CHECK TABLE t_large;
SELECT * FROM t_large WHERE a in (1,1000,1005,50000) order by a;
DROP TABLE t_large;