force_wait flag is no longer needed

This commit is contained in:
Alexander Gololobov 2023-01-30 18:38:28 +01:00
parent 46424a4fa8
commit 863548114d
16 changed files with 19 additions and 20 deletions

View File

@ -147,7 +147,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
{
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext(), false);
table->mutate(mutation_commands, getContext());
}
if (!partition_commands.empty())

View File

@ -73,7 +73,7 @@ BlockIO InterpreterDeleteQuery::execute()
table->checkMutationIsPossible(mutation_commands, getContext()->getSettingsRef());
MutationsInterpreter(table, metadata_snapshot, mutation_commands, getContext(), false).validate();
table->mutate(mutation_commands, getContext(), false);
table->mutate(mutation_commands, getContext());
return {};
}
else if (table->supportsLightweightDelete())

View File

@ -487,7 +487,7 @@ public:
}
/// Mutate the table contents
virtual void mutate(const MutationCommands &, ContextPtr, bool /*force_wait*/)
virtual void mutate(const MutationCommands &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}

View File

@ -217,7 +217,7 @@ void StorageEmbeddedRocksDB::checkMutationIsPossible(const MutationCommands & co
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Only DELETE and UPDATE mutation supported for EmbeddedRocksDB");
}
void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPtr context_, bool /*force_wait*/)
void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPtr context_)
{
if (commands.empty())
return;

View File

@ -52,7 +52,7 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands &, ContextPtr, bool) override;
void mutate(const MutationCommands &, ContextPtr) override;
bool supportsParallelInsert() const override { return true; }
bool supportsIndexForIn() const override { return true; }

View File

@ -104,7 +104,7 @@ void StorageJoin::checkMutationIsPossible(const MutationCommands & commands, con
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Table engine Join supports only DELETE mutations");
}
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context, bool /*force_wait*/)
void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
{
/// Firstly acquire lock for mutation, that locks changes of data.
/// We cannot acquire rwlock here, because read lock is needed

View File

@ -45,7 +45,7 @@ public:
/// Only delete is supported.
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.

View File

@ -319,10 +319,10 @@ void StorageMaterializedView::checkAlterPartitionIsPossible(
getTargetTable()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
}
void StorageMaterializedView::mutate(const MutationCommands & commands, ContextPtr local_context, bool force_wait)
void StorageMaterializedView::mutate(const MutationCommands & commands, ContextPtr local_context)
{
checkStatementCanBeForwarded();
getTargetTable()->mutate(commands, local_context, force_wait);
getTargetTable()->mutate(commands, local_context);
}
void StorageMaterializedView::renameInMemory(const StorageID & new_table_id)

View File

@ -65,7 +65,7 @@ public:
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void renameInMemory(const StorageID & new_table_id) override;

View File

@ -305,7 +305,7 @@ void StorageMemory::checkMutationIsPossible(const MutationCommands & /*commands*
/// Some validation will be added
}
void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context, bool /*force_wait*/)
void StorageMemory::mutate(const MutationCommands & commands, ContextPtr context)
{
std::lock_guard lock(mutex);
auto metadata_snapshot = getInMemoryMetadataPtr();

View File

@ -67,7 +67,7 @@ public:
void drop() override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;

View File

@ -532,14 +532,14 @@ void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn)
it->second.writeCSN(csn);
}
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context, bool force_wait)
void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version = startMutation(commands, query_context);
if (force_wait || query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
waitForMutation(version);
}

View File

@ -85,7 +85,7 @@ public:
const Names & deduplicate_by_columns,
ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
bool hasLightweightDeletedMask() const override;

View File

@ -132,7 +132,7 @@ public:
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, context);
}
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override { getNested()->mutate(commands, context, force_wait); }
void mutate(const MutationCommands & commands, ContextPtr context) override { getNested()->mutate(commands, context); }
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }

View File

@ -6272,7 +6272,7 @@ void StorageReplicatedMergeTree::fetchPartition(
}
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context, bool force_wait)
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context)
{
/// Overview of the mutation algorithm.
///
@ -6386,8 +6386,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
const size_t mutations_sync = force_wait ? 2 : query_context->getSettingsRef().mutations_sync;
waitMutation(mutation_entry.znode_name, mutations_sync);
waitMutation(mutation_entry.znode_name, query_context->getSettingsRef().mutations_sync);
}
void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t mutations_sync) const

View File

@ -153,7 +153,7 @@ public:
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
void mutate(const MutationCommands & commands, ContextPtr context, bool force_wait) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void waitMutation(const String & znode_name, size_t mutations_sync) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;