Enable per query setting

This commit is contained in:
MikhailBurdukov 2024-01-30 10:37:08 +00:00
parent ee5d8c0a27
commit 5e845172da
19 changed files with 167 additions and 58 deletions

View File

@ -636,6 +636,7 @@ class IColumn;
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \ M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \ M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
\ \
M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \ M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \
\ \

View File

@ -104,6 +104,22 @@ bool checkString(const char * s, ReadBuffer & buf)
return true; return true;
} }
bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf)
{
auto initial_position = buf.position();
for (; *s; ++s)
{
if (buf.eof() || *buf.position() != *s)
{
buf.position() = initial_position;
return false;
}
++buf.position();
}
buf.position() = initial_position;
return true;
}
bool checkStringCaseInsensitive(const char * s, ReadBuffer & buf) bool checkStringCaseInsensitive(const char * s, ReadBuffer & buf)
{ {

View File

@ -216,6 +216,9 @@ inline void assertString(const String & s, ReadBuffer & buf)
} }
bool checkString(const char * s, ReadBuffer & buf); bool checkString(const char * s, ReadBuffer & buf);
bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf);
inline bool checkString(const String & s, ReadBuffer & buf) inline bool checkString(const String & s, ReadBuffer & buf)
{ {
return checkString(s.c_str(), buf); return checkString(s.c_str(), buf);

View File

@ -1352,38 +1352,59 @@ protected:
{ {
struct PartMutationInfo struct PartMutationInfo
{ {
size_t retry_count = 0ul; size_t retry_count;
Poco::Timestamp latest_fail_time{}; Poco::Timestamp latest_fail_time;
UInt64 mutation_failure_version = 0ul; UInt64 mutation_failure_version;
size_t max_postpone_time_ms;
size_t max_postpone_power;
PartMutationInfo(UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
: retry_count(0ull)
, latest_fail_time(std::move(Poco::Timestamp()))
, mutation_failure_version(mutation_failure_version_)
, max_postpone_time_ms(max_postpone_time_ms_)
, max_postpone_power((max_postpone_time_ms_) ? (static_cast<size_t>(std::log2(max_postpone_time_ms_))) : (0ull))
{}
Poco::Timestamp getNextMinExecutionTime() const Poco::Timestamp getNextMinExecutionTime() const
{ {
if (max_postpone_time_ms == 0)
return Poco::Timestamp();
return latest_fail_time + (1 << retry_count) * 1000ul; return latest_fail_time + (1 << retry_count) * 1000ul;
} }
void addPartFailure()
{
if (max_postpone_time_ms == 0)
return;
retry_count = std::min(max_postpone_power, retry_count + 1);
latest_fail_time = Poco::Timestamp();
}
bool partCanBeMutated()
{
if (max_postpone_time_ms == 0)
return true;
auto current_time = Poco::Timestamp();
return current_time >= getNextMinExecutionTime();
}
}; };
using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>; using DataPartsWithRetryInfo = std::unordered_map<String, PartMutationInfo>;
DataPartsWithRetryInfo failed_mutation_parts; DataPartsWithRetryInfo failed_mutation_parts;
size_t max_pospone_power;
mutable std::mutex parts_info_lock; mutable std::mutex parts_info_lock;
public: public:
explicit PartMutationBackoffPolicy(ContextPtr global_context_) explicit PartMutationBackoffPolicy(ContextPtr global_context_)
: WithContext(global_context_) : WithContext(global_context_)
{ {
size_t max_pospone_time_ms = global_context_->getMaxPostponeTimeForFailedMutations();
if (max_pospone_time_ms == 0)
max_pospone_power = 0;
else
max_pospone_power = static_cast<size_t>(std::log2(max_pospone_time_ms));
} }
void removeFromFailedByVersion(UInt64 mutation_version) void removeFromFailedByVersion(UInt64 mutation_version)
{ {
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock); std::unique_lock _lock(parts_info_lock);
for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();) for (auto failed_part_it = failed_mutation_parts.begin(); failed_part_it != failed_mutation_parts.end();)
{ {
if (failed_part_it->second.mutation_failure_version == mutation_version) if (failed_part_it->second.mutation_failure_version == mutation_version)
@ -1395,40 +1416,31 @@ protected:
void removePartFromFailed(const String& part_name) void removePartFromFailed(const String& part_name)
{ {
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock); std::unique_lock _lock(parts_info_lock);
failed_mutation_parts.erase(part_name); failed_mutation_parts.erase(part_name);
} }
void addPartMutationFailure (const String& part_name, UInt64 _mutation_failure_version) void addPartMutationFailure (const String& part_name, UInt64 mutation_failure_version_, size_t max_postpone_time_ms_)
{ {
if (max_pospone_power == 0)
return;
std::unique_lock _lock(parts_info_lock); std::unique_lock _lock(parts_info_lock);
auto part_info_it = failed_mutation_parts.find(part_name); auto part_info_it = failed_mutation_parts.find(part_name);
if (part_info_it == failed_mutation_parts.end()) if (part_info_it == failed_mutation_parts.end())
{ {
auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo()); auto [it, success] = failed_mutation_parts.emplace(part_name, PartMutationInfo(mutation_failure_version_, max_postpone_time_ms_));
std::swap(it, part_info_it); std::swap(it, part_info_it);
} }
auto& part_info = part_info_it->second; auto& part_info = part_info_it->second;
part_info.retry_count = std::min(max_pospone_power, part_info.retry_count + 1); part_info.addPartFailure();
part_info.latest_fail_time = Poco::Timestamp();
part_info.mutation_failure_version = _mutation_failure_version;
} }
bool partCanBeMutated(const String& part_name) bool partCanBeMutated(const String& part_name)
{ {
if (max_pospone_power == 0)
return true;
std::unique_lock _lock(parts_info_lock); std::unique_lock _lock(parts_info_lock);
auto iter = failed_mutation_parts.find(part_name); auto iter = failed_mutation_parts.find(part_name);
if (iter == failed_mutation_parts.end()) if (iter == failed_mutation_parts.end())
return true; return true;
return iter->second.partCanBeMutated();
auto current_time = Poco::Timestamp();
return current_time >= iter->second.getNextMinExecutionTime();
} }
}; };
/// Controls postponing logic for failed mutations. /// Controls postponing logic for failed mutations.

View File

@ -48,13 +48,14 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
} }
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number, MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings) const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time_)
: create_time(time(nullptr)) : create_time(time(nullptr))
, commands(std::move(commands_)) , commands(std::move(commands_))
, disk(std::move(disk_)) , disk(std::move(disk_))
, path_prefix(path_prefix_) , path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt") , file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
, is_temp(true) , is_temp(true)
, max_postpone_time(max_postpone_time_)
, tid(tid_) , tid(tid_)
{ {
try try
@ -65,6 +66,10 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
*out << "commands: "; *out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false); commands.writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n"; *out << "\n";
*out << "max postpone time: ";
*out << max_postpone_time;
*out << "\n";
if (tid.isPrehistoric()) if (tid.isPrehistoric())
{ {
csn = Tx::PrehistoricCSN; csn = Tx::PrehistoricCSN;
@ -136,6 +141,11 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
commands.readText(*buf); commands.readText(*buf);
*buf >> "\n"; *buf >> "\n";
if (!buf->eof() && checkStringWithPositionSaving("max postpone time: ", *buf))
{
*buf >> "max postpone time: " >> max_postpone_time >> "\n";
}
if (buf->eof()) if (buf->eof())
{ {
tid = Tx::PrehistoricTID; tid = Tx::PrehistoricTID;

View File

@ -23,6 +23,7 @@ struct MergeTreeMutationEntry
String file_name; String file_name;
bool is_temp = false; bool is_temp = false;
size_t max_postpone_time;
UInt64 block_number = 0; UInt64 block_number = 0;
String latest_failed_part; String latest_failed_part;
@ -38,7 +39,7 @@ struct MergeTreeMutationEntry
/// Create a new entry and write it to a temporary file. /// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number, MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings); const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete; MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default; MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;

View File

@ -117,7 +117,8 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
status.latest_failed_part_info = source_part_info; status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr); status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false); status.latest_fail_reason = getExceptionMessage(saved_exception, false);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1); if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1, log_entry->max_postpone_time);
} }
} }
} }

View File

@ -143,6 +143,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
if (isAlterMutation()) if (isAlterMutation())
out << "\nalter_version\n" << alter_version; out << "\nalter_version\n" << alter_version;
out << "\nmax_postpone_time\n" << max_postpone_time;
break; break;
case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global
@ -318,7 +319,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
{ {
in >> "\n"; in >> "\n";
if (checkString("alter_version\n", in)) if (checkString("max_postpone_time\n", in))
in >> max_postpone_time;
else if (checkString("alter_version\n", in))
in >> alter_version; in >> alter_version;
else if (checkString("to_uuid\n", in)) else if (checkString("to_uuid\n", in))
in >> new_part_uuid; in >> new_part_uuid;

View File

@ -165,6 +165,7 @@ struct ReplicatedMergeTreeLogEntryData
size_t num_postponed = 0; /// The number of times the action was postponed. size_t num_postponed = 0; /// The number of times the action was postponed.
String postpone_reason; /// The reason why the action was postponed, if it was postponed. String postpone_reason; /// The reason why the action was postponed, if it was postponed.
time_t last_postpone_time = 0; /// The time of the last time the action was postponed. time_t last_postpone_time = 0; /// The time of the last time the action was postponed.
size_t max_postpone_time = 0;
/// Creation time or the time to copy from the general log to the queue of a particular replica. /// Creation time or the time to copy from the general log to the queue of a particular replica.
time_t create_time = 0; time_t create_time = 0;

View File

@ -29,7 +29,10 @@ void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const
out << "alter version: "; out << "alter version: ";
out << alter_version; out << alter_version;
out << "\n";
out << "max postpone time: ";
out << max_postpone_time;
} }
void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in) void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
@ -58,6 +61,9 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
commands.readText(in); commands.readText(in);
if (checkString("\nalter version: ", in)) if (checkString("\nalter version: ", in))
in >> alter_version; in >> alter_version;
if (checkString("\nmax postpone time: ", in))
in >> max_postpone_time;
} }
String ReplicatedMergeTreeMutationEntry::toString() const String ReplicatedMergeTreeMutationEntry::toString() const

View File

@ -56,6 +56,8 @@ struct ReplicatedMergeTreeMutationEntry
std::shared_ptr<const IBackupEntry> backup() const; std::shared_ptr<const IBackupEntry> backup() const;
String getBlockNumbersForLogs() const; String getBlockNumbersForLogs() const;
size_t max_postpone_time = 0;
}; };
using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>; using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;

View File

@ -2481,7 +2481,7 @@ bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const Mer
} }
std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> ReplicatedMergeTreeMergePredicate::getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const
{ {
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as /// Assigning mutations is easier than assigning merges because mutations appear in the same order as
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate). /// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
@ -2509,6 +2509,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
Int64 current_version = queue.getCurrentMutationVersion(part->info.partition_id, part->info.getDataVersion()); Int64 current_version = queue.getCurrentMutationVersion(part->info.partition_id, part->info.getDataVersion());
Int64 max_version = in_partition->second.begin()->first; Int64 max_version = in_partition->second.begin()->first;
size_t mutation_postpone_time = 0ul;
int alter_version = -1; int alter_version = -1;
bool barrier_found = false; bool barrier_found = false;
@ -2527,6 +2528,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
} }
max_version = mutation_version; max_version = mutation_version;
mutation_postpone_time = mutation_status->entry->max_postpone_time;
if (current_version < max_version) if (current_version < max_version)
++mutations_count; ++mutations_count;
@ -2560,7 +2562,7 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
LOG_TRACE(queue.log, "Will apply {} mutations and mutate part {} to version {} (the last version is {})", LOG_TRACE(queue.log, "Will apply {} mutations and mutate part {} to version {} (the last version is {})",
mutations_count, part->name, max_version, in_partition->second.rbegin()->first); mutations_count, part->name, max_version, in_partition->second.rbegin()->first);
return std::make_pair(max_version, alter_version); return DesiredMutationDescription({max_version, alter_version, mutation_postpone_time});
} }

View File

@ -563,12 +563,19 @@ public:
/// We should not drop part in this case, because replication queue may stuck without that part. /// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const; bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const;
struct DesiredMutationDescription
{
Int64 mutation_version;
int32_t alter_version;
size_t max_postpone_time;
};
/// Return nonempty optional of desired mutation version and alter version. /// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible /// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with /// mutation version (and -1 as alter version). In other case, we return biggest mutation version with
/// smallest alter version. This required, because we have to execute alter mutations sequentially and /// smallest alter version. This required, because we have to execute alter mutations sequentially and
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance. /// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; std::optional<DesiredMutationDescription> getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const;
bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers, bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const; std::unordered_set<String> & checked_partitions_cache) const;

View File

@ -502,7 +502,8 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
{ {
std::lock_guard lock(currently_processing_in_background_mutex); std::lock_guard lock(currently_processing_in_background_mutex);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings()); MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings(),
query_context->getSettings().max_postpone_time_for_failed_mutations);
version = increment.get(); version = increment.get();
entry.commit(version); entry.commit(version);
String mutation_id = entry.file_name; String mutation_id = entry.file_name;
@ -525,6 +526,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion(); Int64 sources_data_version = result_part->parts.at(0)->info.getDataVersion();
Int64 result_data_version = result_part->part_info.getDataVersion(); Int64 result_data_version = result_part->part_info.getDataVersion();
auto & failed_part = result_part->parts.at(0);
if (sources_data_version != result_data_version) if (sources_data_version != result_data_version)
{ {
std::lock_guard lock(currently_processing_in_background_mutex); std::lock_guard lock(currently_processing_in_background_mutex);
@ -534,7 +537,6 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
for (auto it = mutations_begin_it; it != mutations_end_it; ++it) for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{ {
MergeTreeMutationEntry & entry = it->second; MergeTreeMutationEntry & entry = it->second;
auto & failed_part = result_part->parts.at(0);
if (is_successful) if (is_successful)
{ {
if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info)) if (!entry.latest_failed_part.empty() && result_part->part_info.contains(entry.latest_failed_part_info))
@ -543,7 +545,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = MergeTreePartInfo(); entry.latest_failed_part_info = MergeTreePartInfo();
entry.latest_fail_time = 0; entry.latest_fail_time = 0;
entry.latest_fail_reason.clear(); entry.latest_fail_reason.clear();
mutation_backoff_policy.removePartFromFailed(failed_part->name); if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
} }
} }
else else
@ -552,7 +555,11 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_failed_part_info = failed_part->info; entry.latest_failed_part_info = failed_part->info;
entry.latest_fail_time = time(nullptr); entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = exception_message; entry.latest_fail_reason = exception_message;
mutation_backoff_policy.addPartMutationFailure(failed_part->name, sources_data_version + 1);
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, entry.max_postpone_time);
}
} }
} }
} }
@ -1181,7 +1188,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
CurrentlyMergingPartsTaggerPtr tagger; CurrentlyMergingPartsTaggerPtr tagger;
bool exist_posponed_failed_part = false; bool exist_postponed_failed_part = false;
auto mutations_end_it = current_mutations_by_version.end(); auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVectorForInternalUsage()) for (const auto & part : getDataPartsVectorForInternalUsage())
{ {
@ -1208,7 +1215,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (!mutation_backoff_policy.partCanBeMutated(part->name)) if (!mutation_backoff_policy.partCanBeMutated(part->name))
{ {
exist_posponed_failed_part = true; exist_postponed_failed_part = true;
LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name); LOG_DEBUG(log, "According to exponential backoff policy, do not perform mutations for the part {} yet. Put it aside.", part->name);
continue; continue;
} }
@ -1319,7 +1326,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn); return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands, txn);
} }
} }
if (exist_posponed_failed_part) if (exist_postponed_failed_part)
{ {
std::lock_guard lock(mutation_wait_mutex); std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all(); mutation_wait_event.notify_all();

View File

@ -3776,16 +3776,17 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (part->getBytesOnDisk() > max_source_part_size_for_mutation) if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
continue; continue;
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part); std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> desired_mutation_description = merge_pred->getDesiredMutationDescription(part);
if (!desired_mutation_version) if (!desired_mutation_description)
continue; continue;
create_result = createLogEntryToMutatePart( create_result = createLogEntryToMutatePart(
*part, *part,
future_merged_part->uuid, future_merged_part->uuid,
desired_mutation_version->first, desired_mutation_description->mutation_version,
desired_mutation_version->second, desired_mutation_description->alter_version,
merge_pred->getVersion()); merge_pred->getVersion(),
desired_mutation_description->max_postpone_time);
if (create_result == CreateMergeEntryResult::Ok) if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated; return AttemptStatus::EntryCreated;
@ -3954,7 +3955,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMutatePart( StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMutatePart(
const IMergeTreeDataPart & part, const UUID & new_part_uuid, Int64 mutation_version, int32_t alter_version, int32_t log_version) const IMergeTreeDataPart & part, const UUID & new_part_uuid, Int64 mutation_version, int32_t alter_version, int32_t log_version, size_t max_postpone_time)
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -3984,7 +3985,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.new_part_uuid = new_part_uuid; entry.new_part_uuid = new_part_uuid;
entry.create_time = time(nullptr); entry.create_time = time(nullptr);
entry.alter_version = alter_version; entry.alter_version = alter_version;
entry.max_postpone_time = max_postpone_time;
Coordination::Requests ops; Coordination::Requests ops;
Coordination::Responses responses; Coordination::Responses responses;
@ -7329,7 +7330,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
ReplicatedMergeTreeMutationEntry mutation_entry; ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name; mutation_entry.source_replica = replica_name;
mutation_entry.commands = commands; mutation_entry.commands = commands;
mutation_entry.max_postpone_time = query_context->getSettings().max_postpone_time_for_failed_mutations;
const String mutations_path = fs::path(zookeeper_path) / "mutations"; const String mutations_path = fs::path(zookeeper_path) / "mutations";
const auto zookeeper = getZooKeeper(); const auto zookeeper = getZooKeeper();

View File

@ -754,7 +754,9 @@ private:
const UUID & new_part_uuid, const UUID & new_part_uuid,
Int64 mutation_version, Int64 mutation_version,
int32_t alter_version, int32_t alter_version,
int32_t log_version); int32_t log_version,
size_t max_postpone_time = 0
);
/** Returns an empty string if no one has a part. /** Returns an empty string if no one has a part.
*/ */

View File

@ -1,3 +0,0 @@
<clickhouse>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</default>
</profiles>
</clickhouse>

View File

@ -12,7 +12,7 @@ cluster = ClickHouseCluster(__file__)
node_with_backoff = cluster.add_instance( node_with_backoff = cluster.add_instance(
"node_with_backoff", "node_with_backoff",
macros={"cluster": "test_cluster"}, macros={"cluster": "test_cluster"},
main_configs=["configs/config.d/backoff_mutation_policy.xml"], user_configs=["configs/users.d/backoff_mutation_policy.xml"],
with_zookeeper=True, with_zookeeper=True,
) )
@ -20,6 +20,7 @@ node_no_backoff = cluster.add_instance(
"node_no_backoff", "node_no_backoff",
macros={"cluster": "test_cluster"}, macros={"cluster": "test_cluster"},
with_zookeeper=True, with_zookeeper=True,
stay_alive=True,
) )
REPLICATED_POSPONE_MUTATION_LOG = ( REPLICATED_POSPONE_MUTATION_LOG = (
@ -94,13 +95,13 @@ def test_exponential_backoff_with_merge_tree(started_cluster, node, found_in_log
def test_exponential_backoff_with_replicated_tree(started_cluster): def test_exponential_backoff_with_replicated_tree(started_cluster):
prepare_cluster(True) prepare_cluster(True)
node_no_backoff.query( node_with_backoff.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1" "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM notexist_table) SETTINGS allow_nondeterministic_mutations=1"
) )
time.sleep(5) time.sleep(20)
assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == False assert node_no_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG)
assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG) == True assert node_with_backoff.contains_in_log(REPLICATED_POSPONE_MUTATION_LOG)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -112,7 +113,6 @@ def test_exponential_backoff_with_replicated_tree(started_cluster):
def test_exponential_backoff_create_dependent_table(started_cluster, node): def test_exponential_backoff_create_dependent_table(started_cluster, node):
prepare_cluster(False) prepare_cluster(False)
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
# Executing incorrect mutation. # Executing incorrect mutation.
node.query( node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1" "ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1"
@ -123,3 +123,33 @@ def test_exponential_backoff_create_dependent_table(started_cluster, node):
time.sleep(5) time.sleep(5)
assert node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n" assert node.query("SELECT count() FROM system.mutations WHERE is_done=0") == "0\n"
@pytest.mark.parametrize(
("replicated_table"),
[
(False),
(True),
],
)
def test_backoff_throught_sql_and_restart(started_cluster, replicated_table):
prepare_cluster(replicated_table)
node = node_no_backoff
node.query("INSERT INTO test_mutations SELECT * FROM system.numbers LIMIT 10")
# Executing incorrect mutation.
node.query(
"ALTER TABLE test_mutations DELETE WHERE x IN (SELECT x FROM dep_table) SETTINGS allow_nondeterministic_mutations=1, max_postpone_time_for_failed_mutations=60000"
)
time.sleep(5)
assert node.contains_in_log(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
)
node.restart_clickhouse()
node.rotate_logs()
time.sleep(5)
# After the restart should keep the postpone value.
assert node.contains_in_log(
REPLICATED_POSPONE_MUTATION_LOG if replicated_table else POSPONE_MUTATION_LOG
)