From 67e43937694352cdee6e306e705fd7e6ffc36b9f Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 14 May 2021 11:32:41 +0300 Subject: [PATCH 1/4] If table was not active set readonly mode --- .../ReplicatedMergeTreeRestartingThread.cpp | 44 ++++++++++++----- .../ReplicatedMergeTreeRestartingThread.h | 5 +- .../test.py | 47 ++++++++++++++++--- 3 files changed, 75 insertions(+), 21 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index ca6ea3103d1..b43770f0923 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -57,7 +57,7 @@ void ReplicatedMergeTreeRestartingThread::run() try { - if (first_time || storage.getZooKeeper()->expired()) + if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired()) { startup_completed = false; @@ -67,15 +67,15 @@ void ReplicatedMergeTreeRestartingThread::run() } else { - LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); - - bool old_val = false; - if (storage.is_readonly.compare_exchange_strong(old_val, true)) + if (!readonly_mode_was_set) { - incr_readonly = true; - CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); + LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); + setReadonly(); + } + else + { + LOG_WARNING(log, "Table was in readonly mode. Will try to activate it."); } - partialShutdown(); } @@ -98,8 +98,14 @@ void ReplicatedMergeTreeRestartingThread::run() if (!need_stop && !tryStartup()) { + /// We couldn't startup replication. Table must be readonly. + /// Otherwise it can have partially initialized queue and other + /// strange parts of state. + setReadonly(); + if (first_time) storage.startup_event.set(); + task->scheduleAfter(retry_period_ms); return; } @@ -116,7 +122,7 @@ void ReplicatedMergeTreeRestartingThread::run() bool old_val = true; if (storage.is_readonly.compare_exchange_strong(old_val, false)) { - incr_readonly = false; + readonly_mode_was_set = false; CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); } @@ -125,6 +131,8 @@ void ReplicatedMergeTreeRestartingThread::run() } catch (...) { + /// We couldn't activate table let's set it into readonly mode + setReadonly(); storage.startup_event.set(); tryLogCurrentException(log, __PRETTY_FUNCTION__); } @@ -184,7 +192,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() } catch (const Coordination::Exception & e) { - LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); return false; } catch (const Exception & e) @@ -192,7 +200,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE) throw; - LOG_ERROR(log, "Couldn't start replication: {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); + LOG_ERROR(log, "Couldn't start replication (table will be in readonly mode): {}. {}", e.what(), DB::getCurrentExceptionMessage(true)); return false; } } @@ -356,14 +364,24 @@ void ReplicatedMergeTreeRestartingThread::shutdown() LOG_TRACE(log, "Restarting thread finished"); /// For detach table query, we should reset the ReadonlyReplica metric. - if (incr_readonly) + if (readonly_mode_was_set) { CurrentMetrics::sub(CurrentMetrics::ReadonlyReplica); - incr_readonly = false; + readonly_mode_was_set = false; } /// Stop other tasks. partialShutdown(); } +void ReplicatedMergeTreeRestartingThread::setReadonly() +{ + bool old_val = false; + if (storage.is_readonly.compare_exchange_strong(old_val, true)) + { + readonly_mode_was_set = true; + CurrentMetrics::add(CurrentMetrics::ReadonlyReplica); + } +} + } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h index 824ed73c171..cb10d628349 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h @@ -37,7 +37,7 @@ private: std::atomic need_stop {false}; // We need it besides `storage.is_readonly`, because `shutdown()` may be called many times, that way `storage.is_readonly` will not change. - bool incr_readonly = false; + bool readonly_mode_was_set = false; /// The random data we wrote into `/replicas/me/is_active`. String active_node_identifier; @@ -62,6 +62,9 @@ private: void updateQuorumIfWeHavePart(); void partialShutdown(); + + /// Set readonly mode for table + void setReadonly(); }; diff --git a/tests/integration/test_version_update_after_mutation/test.py b/tests/integration/test_version_update_after_mutation/test.py index 79e5dece174..2549dc0a8cf 100644 --- a/tests/integration/test_version_update_after_mutation/test.py +++ b/tests/integration/test_version_update_after_mutation/test.py @@ -1,4 +1,5 @@ import pytest +import time from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry @@ -34,12 +35,32 @@ def test_mutate_and_upgrade(start_cluster): node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"}) node2.query("SYSTEM SYNC REPLICA mt", timeout=5) - node1.restart_with_latest_version() - node2.restart_with_latest_version() + node1.restart_with_latest_version(signal=9) + node2.restart_with_latest_version(signal=9) - node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);") + exception = None + # After hard restart table can be in readonly mode + for _ in range(40): + try: + node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);") + break + except Exception as ex: + print("Cannot insert into node2 with error {}", ex) + time.sleep(0.5) + exception = ex + else: + raise exception - node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + for _ in range(40): + try: + node1.query("SYSTEM SYNC REPLICA mt", timeout=5) + break + except Exception as ex: + print("Cannot sync node1 with error {}", ex) + time.sleep(0.5) + exception = ex + else: + raise exception assert node1.query("SELECT COUNT() FROM mt") == "2\n" assert node2.query("SELECT COUNT() FROM mt") == "2\n" @@ -73,12 +94,24 @@ def test_upgrade_while_mutation(start_cluster): node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)") - node3.query("SYSTEM STOP MERGES") + node3.query("SYSTEM STOP MERGES mt1") node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0") - node3.restart_with_latest_version() + node3.restart_with_latest_version(signal=9) + + # After hard restart table can be in readonly mode + exception = None + for _ in range(40): + try: + node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) + break + except Exception as ex: + print("Cannot alter node3 with error {}", ex) + time.sleep(0.5) + exception = ex + else: + raise exception # will delete nothing, but previous async mutation will finish with this query - node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n") From d4c6a5a05e4e62a5ddb9aa9d2c610a4efcba96c5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 14 May 2021 11:38:53 +0300 Subject: [PATCH 2/4] Better logging --- .../MergeTree/ReplicatedMergeTreeRestartingThread.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index b43770f0923..6b7fb3bf17f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -67,12 +67,12 @@ void ReplicatedMergeTreeRestartingThread::run() } else { - if (!readonly_mode_was_set) + if (storage.getZooKeeper()->expired()) { LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session."); setReadonly(); } - else + else if (readonly_mode_was_set) { LOG_WARNING(log, "Table was in readonly mode. Will try to activate it."); } From 340cd2027445927f87f30bff6c9186f839f58bd4 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 14 May 2021 11:55:51 +0300 Subject: [PATCH 3/4] Fix flaky TTL replicated test --- tests/integration/test_ttl_replicated/test.py | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index 67614b88029..c18831e5d9d 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -392,11 +392,33 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): time.sleep(5) # Wait for TTL - node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL") + # after restart table can be in readonly mode + exception = None + for _ in range(40): + try: + node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL") + break + except Exception as ex: + print("Cannot optimaze table on node", node_right.name, "exception", ex) + time.sleep(0.5) + exception = ex + else: + raise ex + node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL") node_right.query("OPTIMIZE TABLE test_ttl_where FINAL") - node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20) + for _ in range(40): + try: + node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20) + break + except Exception as ex: + print("Cannot sync replica table on node", node_left.name, "exception", ex) + time.sleep(0.5) + exception = ex + else: + raise ex + node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) From 33e9f1bcf12c0a5f9fa838a7117104f2f15cb6cb Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 15 May 2021 15:33:01 +0300 Subject: [PATCH 4/4] Better tests --- tests/integration/helpers/test_tools.py | 13 ++++++ .../test_polymorphic_parts/test.py | 19 +++++---- tests/integration/test_ttl_replicated/test.py | 26 ++---------- .../test.py | 40 ++----------------- 4 files changed, 31 insertions(+), 67 deletions(-) diff --git a/tests/integration/helpers/test_tools.py b/tests/integration/helpers/test_tools.py index 5fedadd3380..93478c4dd49 100644 --- a/tests/integration/helpers/test_tools.py +++ b/tests/integration/helpers/test_tools.py @@ -80,3 +80,16 @@ def assert_logs_contain_with_retry(instance, substring, retry_count=20, sleep_ti time.sleep(sleep_time) else: raise AssertionError("'{}' not found in logs".format(substring)) + +def exec_query_with_retry(instance, query, retry_count=40, sleep_time=0.5, settings={}): + exception = None + for _ in range(retry_count): + try: + instance.query(query, timeout=30, settings=settings) + break + except Exception as ex: + exception = ex + print("Failed to execute query '", query, "' on instance", instance.name, "will retry") + time.sleep(sleep_time) + else: + raise exception diff --git a/tests/integration/test_polymorphic_parts/test.py b/tests/integration/test_polymorphic_parts/test.py index dc16bab0ca4..9fe3ef77da8 100644 --- a/tests/integration/test_polymorphic_parts/test.py +++ b/tests/integration/test_polymorphic_parts/test.py @@ -7,7 +7,7 @@ import pytest from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager from helpers.test_tools import TSV -from helpers.test_tools import assert_eq_with_retry +from helpers.test_tools import assert_eq_with_retry, exec_query_with_retry cluster = ClickHouseCluster(__file__) @@ -408,8 +408,9 @@ def test_in_memory_wal_rotate(start_cluster): def test_in_memory_deduplication(start_cluster): for i in range(3): - node9.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')") - node10.query("INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')") + # table can be in readonly node + exec_query_with_retry(node9, "INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')") + exec_query_with_retry(node10, "INSERT INTO deduplication_table (date, id, s) VALUES (toDate('2020-03-03'), 1, 'foo')") node9.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20) node10.query("SYSTEM SYNC REPLICA deduplication_table", timeout=20) @@ -430,10 +431,10 @@ def test_in_memory_alters(start_cluster): node9.restart_clickhouse(kill=True) expected = "1\tab\t0\n2\tcd\t0\n" - assert node9.query("SELECT id, s, col1 FROM alters_table") == expected + assert node9.query("SELECT id, s, col1 FROM alters_table ORDER BY id") == expected check_parts_type(1) - - node9.query("INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)") + # After hard restart table can be in readonly mode + exec_query_with_retry(node9, "INSERT INTO alters_table (date, id, col1) VALUES (toDate('2020-10-10'), 3, 100)") node9.query("ALTER TABLE alters_table MODIFY COLUMN col1 String") node9.query("ALTER TABLE alters_table DROP COLUMN s") node9.restart_clickhouse(kill=True) @@ -442,8 +443,10 @@ def test_in_memory_alters(start_cluster): with pytest.raises(Exception): node9.query("SELECT id, s, col1 FROM alters_table") - expected = expected = "1\t0_foo\n2\t0_foo\n3\t100_foo\n" - assert node9.query("SELECT id, col1 || '_foo' FROM alters_table") + # Values of col1 was not materialized as integers, so they have + # default string values after alter + expected = "1\t_foo\n2\t_foo\n3\t100_foo\n" + assert node9.query("SELECT id, col1 || '_foo' FROM alters_table ORDER BY id") == expected def test_polymorphic_parts_index(start_cluster): diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index c18831e5d9d..f32edc36a71 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -3,7 +3,7 @@ import time import helpers.client as client import pytest from helpers.cluster import ClickHouseCluster -from helpers.test_tools import TSV +from helpers.test_tools import TSV, exec_query_with_retry cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_zookeeper=True) @@ -393,31 +393,11 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run): time.sleep(5) # Wait for TTL # after restart table can be in readonly mode - exception = None - for _ in range(40): - try: - node_right.query("OPTIMIZE TABLE test_ttl_delete FINAL") - break - except Exception as ex: - print("Cannot optimaze table on node", node_right.name, "exception", ex) - time.sleep(0.5) - exception = ex - else: - raise ex + exec_query_with_retry(node_right, "OPTIMIZE TABLE test_ttl_delete FINAL") node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL") node_right.query("OPTIMIZE TABLE test_ttl_where FINAL") - - for _ in range(40): - try: - node_left.query("SYSTEM SYNC REPLICA test_ttl_delete", timeout=20) - break - except Exception as ex: - print("Cannot sync replica table on node", node_left.name, "exception", ex) - time.sleep(0.5) - exception = ex - else: - raise ex + exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete") node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20) node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20) diff --git a/tests/integration/test_version_update_after_mutation/test.py b/tests/integration/test_version_update_after_mutation/test.py index 2549dc0a8cf..1ef65512959 100644 --- a/tests/integration/test_version_update_after_mutation/test.py +++ b/tests/integration/test_version_update_after_mutation/test.py @@ -2,7 +2,7 @@ import pytest import time from helpers.cluster import ClickHouseCluster -from helpers.test_tools import assert_eq_with_retry +from helpers.test_tools import assert_eq_with_retry, exec_query_with_retry cluster = ClickHouseCluster(__file__) @@ -38,29 +38,9 @@ def test_mutate_and_upgrade(start_cluster): node1.restart_with_latest_version(signal=9) node2.restart_with_latest_version(signal=9) - exception = None # After hard restart table can be in readonly mode - for _ in range(40): - try: - node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);") - break - except Exception as ex: - print("Cannot insert into node2 with error {}", ex) - time.sleep(0.5) - exception = ex - else: - raise exception - - for _ in range(40): - try: - node1.query("SYSTEM SYNC REPLICA mt", timeout=5) - break - except Exception as ex: - print("Cannot sync node1 with error {}", ex) - time.sleep(0.5) - exception = ex - else: - raise exception + exec_query_with_retry(node2, "INSERT INTO mt VALUES ('2020-02-13', 3)") + exec_query_with_retry(node1, "SYSTEM SYNC REPLICA mt") assert node1.query("SELECT COUNT() FROM mt") == "2\n" assert node2.query("SELECT COUNT() FROM mt") == "2\n" @@ -99,19 +79,7 @@ def test_upgrade_while_mutation(start_cluster): node3.restart_with_latest_version(signal=9) - # After hard restart table can be in readonly mode - exception = None - for _ in range(40): - try: - node3.query("ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) - break - except Exception as ex: - print("Cannot alter node3 with error {}", ex) - time.sleep(0.5) - exception = ex - else: - raise exception - + exec_query_with_retry(node3, "ALTER TABLE mt1 DELETE WHERE id > 100000", settings={"mutations_sync": "2"}) # will delete nothing, but previous async mutation will finish with this query assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")