Merge remote-tracking branch 'origin/fix_table_startup' into integration-2

This commit is contained in:
Yatsishin Ilya 2021-05-14 17:12:45 +03:00
commit f922ef4bbf
4 changed files with 99 additions and 23 deletions

View File

@ -57,7 +57,7 @@ void ReplicatedMergeTreeRestartingThread::run()
try
{
if (first_time || storage.getZooKeeper()->expired())
if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired())
{
startup_completed = false;
@ -67,15 +67,15 @@ void ReplicatedMergeTreeRestartingThread::run()
}
else
{
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
bool old_val = false;
if (storage.is_readonly.compare_exchange_strong(old_val, true))
if (storage.getZooKeeper()->expired())
{
incr_readonly = true;
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
setReadonly();
}
else if (readonly_mode_was_set)
{
LOG_WARNING(log, "Table was in readonly mode. Will try to activate it.");
}
partialShutdown();
}
@ -98,8 +98,14 @@ void ReplicatedMergeTreeRestartingThread::run()
if (!need_stop && !tryStartup())
{
/// We couldn't startup replication. Table must be readonly.
/// Otherwise it can have partially initialized queue and other
/// strange parts of state.
setReadonly();
if (first_time)
storage.startup_event.set();
task->scheduleAfter(retry_period_ms);
return;
}
@ -116,7 +122,7 @@ void ReplicatedMergeTreeRestartingThread::run()
bool old_val = true;
if (storage.is_readonly.compare_exchange_strong(old_val, false))
{
incr_readonly = false;
readonly_mode_was_set = false;
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
}
@ -125,6 +131,8 @@ void ReplicatedMergeTreeRestartingThread::run()
}
catch (...)
{
/// We couldn't activate table let's set it into readonly mode
setReadonly();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
@ -184,7 +192,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
}
catch (const Coordination::Exception & e)
{
LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
return false;
}
catch (const Exception & e)
@ -192,7 +200,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
throw;
LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
return false;
}
}
@ -356,14 +364,24 @@ void ReplicatedMergeTreeRestartingThread::shutdown()
LOG_TRACE(log, "Restarting thread finished");
/// For detach table query, we should reset the ReadonlyReplica metric.
if (incr_readonly)
if (readonly_mode_was_set)
{
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
incr_readonly = false;
readonly_mode_was_set = false;
}
/// Stop other tasks.
partialShutdown();
}
void ReplicatedMergeTreeRestartingThread::setReadonly()
{
bool old_val = false;
if (storage.is_readonly.compare_exchange_strong(old_val, true))
{
readonly_mode_was_set = true;
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
}
}
}

View File

@ -37,7 +37,7 @@ private:
std::atomic<bool> need_stop {false};
// We need it besides `storage.is_readonly`, because `shutdown()` may be called many times, that way `storage.is_readonly` will not change.
bool incr_readonly = false;
bool readonly_mode_was_set = false;
/// The random data we wrote into `/replicas/me/is_active`.
String active_node_identifier;
@ -62,6 +62,9 @@ private:
void updateQuorumIfWeHavePart();
void partialShutdown();
/// Set readonly mode for table
void setReadonly();
};

View File

@ -392,11 +392,33 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
time.sleep(5) # Wait for TTL
node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL")
# after restart table can be in readonly mode
exception = None
for _ in range(40):
try:
node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL")
break
except Exception as ex:
print("Cannot optimaze table on node", node_right.name, "exception", ex)
time.sleep(0.5)
exception = ex
else:
raise ex
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20)
for _ in range(40):
try:
node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20)
break
except Exception as ex:
print("Cannot sync replica table on node", node_left.name, "exception", ex)
time.sleep(0.5)
exception = ex
else:
raise ex
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)

View File

@ -1,4 +1,5 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
@ -34,12 +35,32 @@ def test_mutate_and_upgrade(start_cluster):
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
node2.query("SYSTEM SYNC REPLICA mt", timeout=15)
node1.restart_with_latest_version()
node2.restart_with_latest_version()
node1.restart_with_latest_version(signal=9)
node2.restart_with_latest_version(signal=9)
node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")
exception = None
# After hard restart table can be in readonly mode
for _ in range(40):
try:
node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")
break
except Exception as ex:
print("Cannot insert into node2 with error {}", ex)
time.sleep(0.5)
exception = ex
else:
raise exception
node1.query("SYSTEM SYNC REPLICA mt", timeout=15)
for _ in range(40):
try:
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
break
except Exception as ex:
print("Cannot sync node1 with error {}", ex)
time.sleep(0.5)
exception = ex
else:
raise exception
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
@ -73,12 +94,24 @@ def test_upgrade_while_mutation(start_cluster):
node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)")
node3.query("SYSTEM STOP MERGES")
node3.query("SYSTEM STOP MERGES mt1")
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
node3.restart_with_latest_version()
node3.restart_with_latest_version(signal=9)
# After hard restart table can be in readonly mode
exception = None
for _ in range(40):
try:
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
break
except Exception as ex:
print("Cannot alter node3 with error {}", ex)
time.sleep(0.5)
exception = ex
else:
raise exception
# will delete nothing, but previous async mutation will finish with this query
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")