Make wait endless

This commit is contained in:
alesapin 2019-12-19 18:27:56 +03:00
parent 88033a4333
commit 9fea941009
10 changed files with 95 additions and 86 deletions

View File

@ -388,7 +388,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \
M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \
M(SettingUInt64, mutation_synchronous_wait_timeout, 0, "Seconds to wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). After execute asynchronously. 0 - execute asynchronously from the start.", 0) \
M(SettingUInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -25,6 +25,7 @@ struct MergeTreeMutationEntry
MergeTreePartInfo latest_failed_part_info;
time_t latest_fail_time = 0;
String latest_fail_reason;
int latest_fail_error_code = 0;
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number);

View File

@ -447,15 +447,14 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
LOG_INFO(log, "Added mutation: " << file_name);
merging_mutating_task_handle->wake();
size_t timeout = query_context.getSettingsRef().mutation_synchronous_wait_timeout;
/// If timeout is set, than we can wait
if (timeout != 0)
/// We have to wait mutation end
if (query_context.getSettingsRef().mutations_sync > 0)
{
LOG_INFO(log, "Waiting mutation: " << file_name << " for " << timeout << " seconds");
LOG_INFO(log, "Waiting mutation: " << file_name);
auto check = [version, this]() { return isMutationDone(version); };
std::unique_lock lock(mutation_wait_mutex);
if (!mutation_wait_event.wait_for(lock, std::chrono::seconds{timeout}, check))
throw Exception("Mutation " + file_name + " is not finished. Will be done asynchronously", ErrorCodes::UNFINISHED);
mutation_wait_event.wait(lock, check);
}
}
@ -479,6 +478,10 @@ bool StorageMergeTree::isMutationDone(Int64 mutation_version) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
/// Killed
if (!current_mutations_by_version.count(mutation_version))
return true;
auto data_parts = getDataPartsVector();
for (const auto & data_part : data_parts)
if (data_part->info.getDataVersion() < mutation_version)
@ -559,6 +562,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
global_context.getMergeList().cancelPartMutations({}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
mutation_wait_event.notify_all();
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
merging_mutating_task_handle->wake();

View File

@ -310,87 +310,73 @@ bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper()
}
void StorageReplicatedMergeTree::waitForAllReplicasToStatisfyNodeCondition(
size_t timeout, const String & name_for_logging,
const String & replica_relative_node_path, CheckNodeCallback callback) const
void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const
{
const auto operation_start = std::chrono::system_clock::now();
std::chrono::milliseconds total_time{timeout * 1000};
if (replicas.empty())
return;
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
std::set<String> inactive_replicas;
std::set<String> timed_out_replicas;
for (const String & replica : replicas)
{
LOG_DEBUG(log, "Waiting for " << replica << " to apply " + name_for_logging);
bool operation_is_processed_by_relica = false;
LOG_DEBUG(log, "Waiting for " << replica << " to apply mutation " + mutation_id);
while (!partial_shutdown_called)
{
/// Mutation maybe killed or whole replica was deleted.
/// Wait event will unblock at this moment.
Coordination::Stat exists_stat;
if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event))
{
LOG_WARNING(log, "Mutation " << mutation_id << " was killed or manually removed. Nothing to wait.");
return;
}
auto zookeeper = getZooKeeper();
/// Replica could be inactive.
if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
LOG_WARNING(log, "Replica " << replica << " is not active during mutation query."
<< name_for_logging << " will be done asynchronously when replica becomes active.");
LOG_WARNING(log, "Replica " << replica << " is not active during mutation. "
"Mutation will be done asynchronously when replica becomes active.");
inactive_replicas.emplace(replica);
break;
}
String node_for_check = zookeeper_path + "/replicas/" + replica + "/" + replica_relative_node_path;
std::string node_for_check_value;
Coordination::Stat stat;
String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer";
std::string mutation_pointer_value;
Coordination::Stat get_stat;
/// Replica could be removed
if (!zookeeper->tryGet(node_for_check, node_for_check_value, &stat, wait_event))
if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event))
{
LOG_WARNING(log, replica << " was removed");
operation_is_processed_by_relica = true;
break;
}
else /// in other case check required node
{
if (callback(node_for_check_value))
{
operation_is_processed_by_relica = true;
break; /// operation is done
}
}
else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation
break; /// (numbers like 0000000000 and 0000000001)
std::chrono::milliseconds time_spent =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - operation_start);
std::chrono::milliseconds time_left = total_time - time_spent;
/// We have some time to wait
if (time_left.count() > 0)
wait_event->tryWait(time_left.count());
else /// Otherwise time is up
break;
/// We wait without timeout.
wait_event->wait();
}
if (partial_shutdown_called)
throw Exception(name_for_logging + " is not finished because table shutdown was called. " + name_for_logging + " will be done after table restart.",
throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.",
ErrorCodes::UNFINISHED);
if (!operation_is_processed_by_relica && !inactive_replicas.count(replica))
timed_out_replicas.emplace(replica);
}
if (!inactive_replicas.empty() || !timed_out_replicas.empty())
if (!inactive_replicas.empty())
{
std::stringstream exception_message;
exception_message << name_for_logging << " is not finished because";
exception_message << "Mutation is not finished because";
if (!inactive_replicas.empty())
exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", ");
if (!timed_out_replicas.empty() && !inactive_replicas.empty())
exception_message << " and";
if (!timed_out_replicas.empty())
exception_message << " timeout when waiting for some replicas: " << boost::algorithm::join(timed_out_replicas, ", ");
exception_message << ". " << name_for_logging << " will be done asynchronously";
exception_message << ". Mutation will be done asynchronously";
throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
}
@ -3382,7 +3368,7 @@ void StorageReplicatedMergeTree::alter(
time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
/// This code is quite similar with waitForAllReplicasToStatisfyNodeCondition
/// This code is quite similar with waitMutationToFinishOnReplicas
/// but contains more complicated details (versions manipulations, multiple nodes, etc.).
/// It will be removed soon in favor of alter-modify implementation on top of mutations.
/// TODO (alesap)
@ -4603,17 +4589,21 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
throw Coordination::Exception("Unable to create a mutation znode", rc);
}
if (query_context.getSettingsRef().mutation_synchronous_wait_timeout != 0) /// some timeout specified
/// we have to wait
if (query_context.getSettingsRef().mutations_sync != 0)
{
auto check_callback = [mutation_number = entry.znode_name](const String & zk_value)
{
/// Maybe we already processed more fresh mutation
/// We can compare their znode names (numbers like 0000000000 and 0000000001).
return zk_value >= mutation_number;
};
waitForAllReplicasToStatisfyNodeCondition(
query_context.getSettingsRef().mutation_synchronous_wait_timeout, "Mutation", "mutation_pointer", check_callback);
Strings replicas;
if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas
replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
replicas.push_back(replica_path);
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
}
}

View File

@ -532,14 +532,9 @@ private:
/// return true if it's fixed
bool checkFixedGranualrityInZookeeper();
using CheckNodeCallback = std::function<bool(const String & nodevalue_from_zookeeper)>;
/// Wait for timeout seconds when condition became true for node
/// /replicas/{replica}/replica_replative_node_path value for all replicas.
/// operation_name_for_logging used for logging about errors.
void waitForAllReplicasToStatisfyNodeCondition(
size_t timeout, const String & operaton_name_for_logging,
const String & replica_relative_node_path, CheckNodeCallback condition) const;
/// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
protected:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.

View File

@ -14,27 +14,29 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation VALUES ('2001-01-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" &
sleep 0.1
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation'"
wait
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that blocks another mutation ***'"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" &
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
sleep 0.1
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'"
wait_for_mutation "kill_mutation" "mutation_5.txt" "test"
wait
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation"

View File

@ -1,6 +1,7 @@
*** Create and kill a single invalid mutation ***
0000000000 1 1 Code: 6,
waiting test kill_mutation_r1 0000000000
Mutation 0000000000 was killed
*** Create and kill invalid mutation that blocks another mutation ***
0000000001 1 1 Code: 6,
waiting test kill_mutation_r1 0000000001

View File

@ -18,12 +18,24 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation_r1 VALUES ('2001-01
${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'"
# wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutation_synchronous_wait_timeout=2" 2>/dev/null
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "Mutation 0000000000 was killed" &
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
check_query1="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1`
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="$check_query1"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1'"
wait
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'"
@ -32,13 +44,24 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that
${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA test.kill_mutation_r1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutation_synchronous_wait_timeout=2" 2>/dev/null
# good mutation, but blocked with wrong mutation
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutations_sync=2" &
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
check_query2="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1`
while [ -z "$query_result" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1`
sleep 0.1
done
$CLICKHOUSE_CLIENT --query="$check_query2"
${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'"
wait_for_mutation "kill_mutation_r2" "0000000002" "test"
wait
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation_r2"

View File

@ -2,9 +2,7 @@ Replicated
1
1
1
1
Normal
1
1
1
1

View File

@ -11,14 +11,12 @@ INSERT INTO table_for_synchronous_mutations1 select number, number from numbers(
SYSTEM SYNC REPLICA table_for_synchronous_mutations2;
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341}
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 15;
ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1';
@ -33,16 +31,13 @@ CREATE TABLE table_for_synchronous_mutations_no_replication(k UInt32, v1 UInt64)
INSERT INTO table_for_synchronous_mutations_no_replication select number, number from numbers(100000);
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10;
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341}
-- Another mutation, just to be sure, that previous finished
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 15;
ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2;
SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication';
DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;