mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #24122 from ClickHouse/fix_table_startup
Set readonly mode if restarting thread failed to activate replica
This commit is contained in:
commit
29726b5510
@ -57,7 +57,7 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (first_time || storage.getZooKeeper()->expired())
|
if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired())
|
||||||
{
|
{
|
||||||
startup_completed = false;
|
startup_completed = false;
|
||||||
|
|
||||||
@ -67,15 +67,15 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
|
if (storage.getZooKeeper()->expired())
|
||||||
|
|
||||||
bool old_val = false;
|
|
||||||
if (storage.is_readonly.compare_exchange_strong(old_val, true))
|
|
||||||
{
|
{
|
||||||
incr_readonly = true;
|
LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
|
||||||
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
|
setReadonly();
|
||||||
|
}
|
||||||
|
else if (readonly_mode_was_set)
|
||||||
|
{
|
||||||
|
LOG_WARNING(log, "Table was in readonly mode. Will try to activate it.");
|
||||||
}
|
}
|
||||||
|
|
||||||
partialShutdown();
|
partialShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,8 +98,14 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
|
|
||||||
if (!need_stop && !tryStartup())
|
if (!need_stop && !tryStartup())
|
||||||
{
|
{
|
||||||
|
/// We couldn't startup replication. Table must be readonly.
|
||||||
|
/// Otherwise it can have partially initialized queue and other
|
||||||
|
/// strange parts of state.
|
||||||
|
setReadonly();
|
||||||
|
|
||||||
if (first_time)
|
if (first_time)
|
||||||
storage.startup_event.set();
|
storage.startup_event.set();
|
||||||
|
|
||||||
task->scheduleAfter(retry_period_ms);
|
task->scheduleAfter(retry_period_ms);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -116,7 +122,7 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
bool old_val = true;
|
bool old_val = true;
|
||||||
if (storage.is_readonly.compare_exchange_strong(old_val, false))
|
if (storage.is_readonly.compare_exchange_strong(old_val, false))
|
||||||
{
|
{
|
||||||
incr_readonly = false;
|
readonly_mode_was_set = false;
|
||||||
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -125,6 +131,8 @@ void ReplicatedMergeTreeRestartingThread::run()
|
|||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
|
/// We couldn't activate table let's set it into readonly mode
|
||||||
|
setReadonly();
|
||||||
storage.startup_event.set();
|
storage.startup_event.set();
|
||||||
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
||||||
}
|
}
|
||||||
@ -184,7 +192,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
|||||||
}
|
}
|
||||||
catch (const Coordination::Exception & e)
|
catch (const Coordination::Exception & e)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
|
LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch (const Exception & e)
|
catch (const Exception & e)
|
||||||
@ -192,7 +200,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
|||||||
if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
|
if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
|
||||||
throw;
|
throw;
|
||||||
|
|
||||||
LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
|
LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -356,14 +364,24 @@ void ReplicatedMergeTreeRestartingThread::shutdown()
|
|||||||
LOG_TRACE(log, "Restarting thread finished");
|
LOG_TRACE(log, "Restarting thread finished");
|
||||||
|
|
||||||
/// For detach table query, we should reset the ReadonlyReplica metric.
|
/// For detach table query, we should reset the ReadonlyReplica metric.
|
||||||
if (incr_readonly)
|
if (readonly_mode_was_set)
|
||||||
{
|
{
|
||||||
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica);
|
||||||
incr_readonly = false;
|
readonly_mode_was_set = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop other tasks.
|
/// Stop other tasks.
|
||||||
partialShutdown();
|
partialShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ReplicatedMergeTreeRestartingThread::setReadonly()
|
||||||
|
{
|
||||||
|
bool old_val = false;
|
||||||
|
if (storage.is_readonly.compare_exchange_strong(old_val, true))
|
||||||
|
{
|
||||||
|
readonly_mode_was_set = true;
|
||||||
|
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ private:
|
|||||||
std::atomic<bool> need_stop {false};
|
std::atomic<bool> need_stop {false};
|
||||||
|
|
||||||
// We need it besides `storage.is_readonly`, because `shutdown()` may be called many times, that way `storage.is_readonly` will not change.
|
// We need it besides `storage.is_readonly`, because `shutdown()` may be called many times, that way `storage.is_readonly` will not change.
|
||||||
bool incr_readonly = false;
|
bool readonly_mode_was_set = false;
|
||||||
|
|
||||||
/// The random data we wrote into `/replicas/me/is_active`.
|
/// The random data we wrote into `/replicas/me/is_active`.
|
||||||
String active_node_identifier;
|
String active_node_identifier;
|
||||||
@ -62,6 +62,9 @@ private:
|
|||||||
void updateQuorumIfWeHavePart();
|
void updateQuorumIfWeHavePart();
|
||||||
|
|
||||||
void partialShutdown();
|
void partialShutdown();
|
||||||
|
|
||||||
|
/// Set readonly mode for table
|
||||||
|
void setReadonly();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -80,3 +80,16 @@ def assert_logs_contain_with_retry(instance, substring, retry_count=20, sleep_ti
|
|||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
else:
|
else:
|
||||||
raise AssertionError("'{}' not found in logs".format(substring))
|
raise AssertionError("'{}' not found in logs".format(substring))
|
||||||
|
|
||||||
|
def exec_query_with_retry(instance, query, retry_count=40, sleep_time=0.5, settings={}):
|
||||||
|
exception = None
|
||||||
|
for _ in range(retry_count):
|
||||||
|
try:
|
||||||
|
instance.query(query, timeout=30, settings=settings)
|
||||||
|
break
|
||||||
|
except Exception as ex:
|
||||||
|
exception = ex
|
||||||
|
print("Failed to execute query '", query, "' on instance", instance.name, "will retry")
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
else:
|
||||||
|
raise exception
|
||||||
|
@ -7,7 +7,7 @@ import pytest
|
|||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from helpers.network import PartitionManager
|
from helpers.network import PartitionManager
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV
|
||||||
from helpers.test_tools import assert_eq_with_retry
|
from helpers.test_tools import assert_eq_with_retry, exec_query_with_retry
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
@ -408,8 +408,9 @@ def test_in_memory_wal_rotate(start_cluster):
|
|||||||
|
|
||||||
def test_in_memory_deduplication(start_cluster):
|
def test_in_memory_deduplication(start_cluster):
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
node9.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
|
# table can be in readonly node
|
||||||
node10.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
|
exec_query_with_retry(node9, "INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
|
||||||
|
exec_query_with_retry(node10, "INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')")
|
||||||
|
|
||||||
node9.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
|
node9.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
|
||||||
node10.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
|
node10.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20)
|
||||||
@ -430,10 +431,10 @@ def test_in_memory_alters(start_cluster):
|
|||||||
node9.restart_clickhouse(kill=True)
|
node9.restart_clickhouse(kill=True)
|
||||||
|
|
||||||
expected = "1\tab\t0\n2\tcd\t0\n"
|
expected = "1\tab\t0\n2\tcd\t0\n"
|
||||||
assert node9.query("SELECT id, s, col1 FROM alters_table") == expected
|
assert node9.query("SELECT id, s, col1 FROM alters_table ORDER BY id") == expected
|
||||||
check_parts_type(1)
|
check_parts_type(1)
|
||||||
|
# After hard restart table can be in readonly mode
|
||||||
node9.query("INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)")
|
exec_query_with_retry(node9, "INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)")
|
||||||
node9.query("ALTER TABLE alters_table MODIFY COLUMN col1 String")
|
node9.query("ALTER TABLE alters_table MODIFY COLUMN col1 String")
|
||||||
node9.query("ALTER TABLE alters_table DROP COLUMN s")
|
node9.query("ALTER TABLE alters_table DROP COLUMN s")
|
||||||
node9.restart_clickhouse(kill=True)
|
node9.restart_clickhouse(kill=True)
|
||||||
@ -442,8 +443,10 @@ def test_in_memory_alters(start_cluster):
|
|||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
node9.query("SELECT id, s, col1 FROM alters_table")
|
node9.query("SELECT id, s, col1 FROM alters_table")
|
||||||
|
|
||||||
expected = expected = "1\t0_foo\n2\t0_foo\n3\t100_foo\n"
|
# Values of col1 was not materialized as integers, so they have
|
||||||
assert node9.query("SELECT id, col1 || '_foo' FROM alters_table")
|
# default string values after alter
|
||||||
|
expected = "1\t_foo\n2\t_foo\n3\t100_foo\n"
|
||||||
|
assert node9.query("SELECT id, col1 || '_foo' FROM alters_table ORDER BY id") == expected
|
||||||
|
|
||||||
|
|
||||||
def test_polymorphic_parts_index(start_cluster):
|
def test_polymorphic_parts_index(start_cluster):
|
||||||
|
@ -3,7 +3,7 @@ import time
|
|||||||
import helpers.client as client
|
import helpers.client as client
|
||||||
import pytest
|
import pytest
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from helpers.test_tools import TSV
|
from helpers.test_tools import TSV, exec_query_with_retry
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
node1 = cluster.add_instance('node1', with_zookeeper=True)
|
||||||
@ -392,11 +392,13 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
|
|||||||
|
|
||||||
time.sleep(5) # Wait for TTL
|
time.sleep(5) # Wait for TTL
|
||||||
|
|
||||||
node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL")
|
# after restart table can be in readonly mode
|
||||||
|
exec_query_with_retry(node_right, "OPTIMIZE TABLE test_ttl_delete FINAL")
|
||||||
|
|
||||||
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
|
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
|
||||||
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
|
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
|
||||||
|
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete")
|
||||||
|
|
||||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20)
|
|
||||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
|
||||||
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
|
||||||
|
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
import time
|
||||||
|
|
||||||
from helpers.cluster import ClickHouseCluster
|
from helpers.cluster import ClickHouseCluster
|
||||||
from helpers.test_tools import assert_eq_with_retry
|
from helpers.test_tools import assert_eq_with_retry, exec_query_with_retry
|
||||||
|
|
||||||
cluster = ClickHouseCluster(__file__)
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
|
||||||
@ -34,12 +35,12 @@ def test_mutate_and_upgrade(start_cluster):
|
|||||||
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
|
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
|
||||||
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
||||||
|
|
||||||
node1.restart_with_latest_version()
|
node1.restart_with_latest_version(signal=9)
|
||||||
node2.restart_with_latest_version()
|
node2.restart_with_latest_version(signal=9)
|
||||||
|
|
||||||
node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")
|
# After hard restart table can be in readonly mode
|
||||||
|
exec_query_with_retry(node2, "INSERT INTO mt VALUES ('2020-02-13', 3)")
|
||||||
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
exec_query_with_retry(node1, "SYSTEM SYNC REPLICA mt")
|
||||||
|
|
||||||
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
|
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
|
||||||
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
|
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
|
||||||
@ -73,12 +74,12 @@ def test_upgrade_while_mutation(start_cluster):
|
|||||||
|
|
||||||
node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)")
|
node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)")
|
||||||
|
|
||||||
node3.query("SYSTEM STOP MERGES")
|
node3.query("SYSTEM STOP MERGES mt1")
|
||||||
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
|
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
|
||||||
|
|
||||||
node3.restart_with_latest_version()
|
node3.restart_with_latest_version(signal=9)
|
||||||
|
|
||||||
|
exec_query_with_retry(node3, "ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
|
||||||
# will delete nothing, but previous async mutation will finish with this query
|
# will delete nothing, but previous async mutation will finish with this query
|
||||||
node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"})
|
|
||||||
|
|
||||||
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")
|
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")
|
||||||
|
Loading…
Reference in New Issue
Block a user