Merge pull request #51851 from ClickHouse/add_delay_for_replicated

Make shutdown of `ReplicatedMergeTree` tables more soft
This commit is contained in:
Alexander Tokmakov 2023-07-26 12:59:37 +03:00 committed by GitHub
commit 4d03c23166
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 394 additions and 33 deletions

View File

@ -62,6 +62,7 @@ configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start
@ -91,6 +92,7 @@ sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_defau
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/config.d/enable_wait_for_shutdown_replicated_tables.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
start

View File

@ -747,6 +747,7 @@ try
std::lock_guard lock(servers_lock);
metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});

View File

@ -292,7 +292,7 @@ void DatabaseWithOwnTablesBase::shutdown()
for (const auto & kv : tables_snapshot)
{
kv.second->flush();
kv.second->flushAndPrepareForShutdown();
}
for (const auto & kv : tables_snapshot)

View File

@ -361,7 +361,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
std::vector<std::pair<String, bool>> tables_to_drop;
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
iterator->table()->flush();
iterator->table()->flushAndPrepareForShutdown();
tables_to_drop.push_back({iterator->name(), iterator->table()->isDictionary()});
}

View File

@ -553,15 +553,15 @@ public:
/**
* If the storage requires some complicated work on destroying,
* then you have two virtual methods:
* - flush()
* - flushAndPrepareForShutdown()
* - shutdown()
*
* @see shutdown()
* @see flush()
* @see flushAndPrepareForShutdown()
*/
void flushAndShutdown()
{
flush();
flushAndPrepareForShutdown();
shutdown();
}
@ -574,7 +574,7 @@ public:
/// Called before shutdown() to flush data to underlying storage
/// Data in memory need to be persistent
virtual void flush() {}
virtual void flushAndPrepareForShutdown() {}
/// Asks table to stop executing some action identified by action_type
/// If table does not support such type of lock, and empty lock is returned

View File

@ -203,6 +203,8 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);
data.addLastSentPart(part->info);
}
catch (const NetException &)
{

View File

@ -119,6 +119,7 @@ struct Settings;
M(Bool, detach_not_byte_identical_parts, false, "Do not remove non byte-idential parts for ReplicatedMergeTree, instead detach them (maybe useful for further analysis).", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \

View File

@ -576,7 +576,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
/// It's ok if replica became readonly due to connection loss after we got current zookeeper (in this case zookeeper must be expired).
/// And it's ok if replica became readonly after shutdown.
/// In other cases it's likely that someone called pullLogsToQueue(...) when queue is not initialized yet by RestartingThread.
bool not_completely_initialized = storage.is_readonly && !zookeeper->expired() && !storage.shutdown_called;
bool not_completely_initialized = storage.is_readonly && !zookeeper->expired() && !storage.shutdown_prepared_called;
if (not_completely_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tried to pull logs to queue (reason: {}) on readonly replica {}, it's a bug",
reason, storage.getStorageID().getNameForLogs());

View File

@ -329,7 +329,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
setReadonly(/* on_shutdown = */ part_of_full_shutdown);
storage.partialShutdown();
}
@ -339,10 +339,15 @@ void ReplicatedMergeTreeRestartingThread::shutdown(bool part_of_full_shutdown)
/// Stop restarting_thread before stopping other tasks - so that it won't restart them again.
need_stop = true;
task->deactivate();
/// Explicitly set the event, because the restarting thread will not set it again
if (part_of_full_shutdown)
storage.startup_event.set();
LOG_TRACE(log, "Restarting thread finished");
/// Stop other tasks.
partialShutdown(part_of_full_shutdown);
setReadonly(part_of_full_shutdown);
}
void ReplicatedMergeTreeRestartingThread::setReadonly(bool on_shutdown)

View File

@ -5,6 +5,7 @@
#include <base/types.h>
#include <thread>
#include <atomic>
#include <Common/logger_useful.h>
namespace DB
@ -25,6 +26,7 @@ public:
void start(bool schedule = true)
{
LOG_TRACE(log, "Starting restating thread, schedule: {}", schedule);
if (schedule)
task->activateAndSchedule();
else
@ -36,6 +38,7 @@ public:
void shutdown(bool part_of_full_shutdown);
void run();
private:
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -682,7 +682,7 @@ void StorageBuffer::startup()
}
void StorageBuffer::flush()
void StorageBuffer::flushAndPrepareForShutdown()
{
if (!flush_handle)
return;

View File

@ -92,7 +92,7 @@ public:
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.
void flush() override;
void flushAndPrepareForShutdown() override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -1432,7 +1432,7 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::flush()
void StorageDistributed::flushAndPrepareForShutdown()
{
try
{

View File

@ -135,7 +135,7 @@ public:
void initializeFromDisk();
void shutdown() override;
void flush() override;
void flushAndPrepareForShutdown() override;
void drop() override;
bool storesDataOnDisk() const override { return data_volume != nullptr; }

View File

@ -139,7 +139,7 @@ public:
void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); }
void flush() override { getNested()->flush(); }
void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); }
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }

View File

@ -2,6 +2,7 @@
#include <cstddef>
#include <ranges>
#include <chrono>
#include <base/hex.h>
#include <base/interpolate.h>
@ -185,6 +186,7 @@ namespace ErrorCodes
extern const int CHECKSUM_DOESNT_MATCH;
extern const int NOT_INITIALIZED;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int TABLE_IS_DROPPED;
}
namespace ActionLocks
@ -3921,7 +3923,10 @@ void StorageReplicatedMergeTree::startBeingLeader()
void StorageReplicatedMergeTree::stopBeingLeader()
{
if (!is_leader)
{
LOG_TRACE(log, "stopBeingLeader called but we are not a leader already");
return;
}
LOG_INFO(log, "Stopped being leader");
is_leader = false;
@ -3978,6 +3983,153 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam
return {};
}
void StorageReplicatedMergeTree::addLastSentPart(const MergeTreePartInfo & info)
{
{
std::lock_guard lock(last_sent_parts_mutex);
last_sent_parts.emplace_back(info);
static constexpr size_t LAST_SENT_PARS_WINDOW_SIZE = 1000;
while (last_sent_parts.size() > LAST_SENT_PARS_WINDOW_SIZE)
last_sent_parts.pop_front();
}
last_sent_parts_cv.notify_all();
}
void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(StorageReplicatedMergeTree::ShutdownDeadline shutdown_deadline_)
{
/// Will be true in case in case of query
if (CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr)
{
LOG_TRACE(log, "Will not wait for unique parts to be fetched by other replicas because shutdown called from DROP/DETACH query");
return;
}
if (!shutdown_called.load())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Called waitForUniquePartsToBeFetchedByOtherReplicas before shutdown, it's a bug");
auto settings_ptr = getSettings();
auto wait_ms = settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds();
if (wait_ms == 0)
{
LOG_INFO(log, "Will not wait for unique parts to be fetched by other replicas because wait time is zero");
return;
}
if (shutdown_deadline_ <= std::chrono::system_clock::now())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched by other replicas because shutdown_deadline already passed");
return;
}
auto zookeeper = getZooKeeperIfTableShutDown();
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log);
if (unique_parts_set.empty())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched because we don't have any unique parts");
return;
}
else
{
LOG_INFO(log, "Will wait for {} unique parts to be fetched", unique_parts_set.size());
}
auto wait_predicate = [&] () -> bool
{
for (auto it = unique_parts_set.begin(); it != unique_parts_set.end();)
{
const auto & part = *it;
bool found = false;
for (const auto & sent_part : last_sent_parts | std::views::reverse)
{
if (sent_part.contains(part))
{
LOG_TRACE(log, "Part {} was fetched by some replica", part.getPartNameForLogs());
found = true;
it = unique_parts_set.erase(it);
break;
}
}
if (!found)
break;
}
return unique_parts_set.empty();
};
std::unique_lock lock(last_sent_parts_mutex);
if (!last_sent_parts_cv.wait_until(lock, shutdown_deadline_, wait_predicate))
LOG_INFO(log, "Failed to wait for unique parts to be fetched in {} ms, {} parts can be left on this replica", wait_ms, unique_parts_set.size());
else
LOG_INFO(log, "Successfully waited all the parts");
}
std::set<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_)
{
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica_name_ / "is_active"))
{
LOG_INFO(log_, "Our replica is not active, nobody will try to fetch anything");
return {};
}
Strings replicas = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas");
Strings our_parts;
std::vector<ActiveDataPartSet> data_parts_on_replicas;
for (const String & replica : replicas)
{
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active"))
{
LOG_TRACE(log_, "Replica {} is not active, skipping", replica);
continue;
}
Strings parts = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas" / replica / "parts");
if (replica == replica_name_)
{
LOG_TRACE(log_, "Our replica parts collected {}", replica);
our_parts = parts;
}
else
{
LOG_TRACE(log_, "Fetching parts for replica {}: [{}]", replica, fmt::join(parts, ", "));
data_parts_on_replicas.emplace_back(format_version_, parts);
}
}
if (data_parts_on_replicas.empty())
{
LOG_TRACE(log_, "Has no active replicas, will no try to wait for fetch");
return {};
}
std::set<MergeTreePartInfo> our_unique_parts;
for (const auto & part : our_parts)
{
bool found = false;
for (const auto & active_parts_set : data_parts_on_replicas)
{
if (!active_parts_set.getContainingPart(part).empty())
{
found = true;
break;
}
}
if (!found)
{
LOG_TRACE(log_, "Part not {} found on other replicas", part);
our_unique_parts.emplace(MergeTreePartInfo::fromPartName(part, format_version_));
}
}
if (!our_parts.empty() && our_unique_parts.empty())
LOG_TRACE(log_, "All parts found on replicas");
return our_unique_parts;
}
String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entry, bool active)
{
auto zookeeper = getZooKeeper();
@ -4637,6 +4789,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
void StorageReplicatedMergeTree::startup()
{
LOG_TRACE(log, "Starting up table");
startOutdatedDataPartsLoadingTask();
if (attach_thread)
{
@ -4658,6 +4811,8 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
since_metadata_err_incr_readonly_metric = true;
CurrentMetrics::add(CurrentMetrics::ReadonlyReplica);
}
LOG_TRACE(log, "No connection to ZooKeeper or no metadata in ZooKeeper, will not startup");
return;
}
@ -4692,6 +4847,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
if (from_attach_thread)
{
LOG_TRACE(log, "Trying to startup table from right now");
/// Try activating replica in current thread.
restarting_thread.run();
}
@ -4701,9 +4857,18 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
/// NOTE It does not mean that replication is actually started after receiving this event.
/// It only means that an attempt to startup replication was made.
/// Table may be still in readonly mode if this attempt failed for any reason.
startup_event.wait();
while (!startup_event.tryWait(10 * 1000))
LOG_TRACE(log, "Waiting for RestartingThread to startup table");
}
auto lock = std::unique_lock<std::mutex>(flush_and_shutdown_mutex, std::defer_lock);
do
{
if (shutdown_prepared_called.load() || shutdown_called.load())
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Cannot startup table because it is dropped");
}
while (!lock.try_lock());
/// And this is just a callback
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
{
@ -4744,6 +4909,37 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
}
void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
{
std::lock_guard lock{flush_and_shutdown_mutex};
if (shutdown_prepared_called.exchange(true))
return;
try
{
auto settings_ptr = getSettings();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
stopBeingLeader();
if (attach_thread)
attach_thread->shutdown();
restarting_thread.shutdown(/* part_of_full_shutdown */true);
/// Explicitly set the event, because the restarting thread will not set it again
startup_event.set();
shutdown_deadline.emplace(std::chrono::system_clock::now() + std::chrono::milliseconds(settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds()));
}
catch (...)
{
/// Don't wait anything in case of improper prepare for shutdown
shutdown_deadline.emplace(std::chrono::system_clock::now());
throw;
}
}
void StorageReplicatedMergeTree::partialShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
@ -4779,21 +4975,28 @@ void StorageReplicatedMergeTree::shutdown()
if (shutdown_called.exchange(true))
return;
flushAndPrepareForShutdown();
if (!shutdown_deadline.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Shutdown deadline is not set in shutdown");
try
{
waitForUniquePartsToBeFetchedByOtherReplicas(*shutdown_deadline);
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::LOGICAL_ERROR)
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
session_expired_callback_handler.reset();
stopOutdatedDataPartsLoadingTask();
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
mutations_finalizing_task->deactivate();
stopBeingLeader();
partialShutdown();
if (attach_thread)
attach_thread->shutdown();
restarting_thread.shutdown(/* part_of_full_shutdown */true);
background_operations_assignee.finish();
part_moves_between_shards_orchestrator.shutdown();
{
@ -6167,7 +6370,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
const auto & stop_waiting = [&]()
{
bool stop_waiting_itself = waiting_itself && partial_shutdown_called;
bool stop_waiting_itself = waiting_itself && (partial_shutdown_called || shutdown_prepared_called || shutdown_called);
bool timeout_exceeded = check_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds();
bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded)
&& !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active");

View File

@ -112,8 +112,35 @@ public:
bool need_check_structure);
void startup() override;
void shutdown() override;
/// To many shutdown methods....
///
/// Partial shutdown called if we loose connection to zookeeper.
/// Table can also recover after partial shutdown and continue
/// to work. This method can be called regularly.
void partialShutdown();
/// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown).
/// The shutdown process is split into two methods to make it more soft and fast. In database shutdown()
/// looks like:
/// for (table : tables)
/// table->flushAndPrepareForShutdown()
///
/// for (table : tables)
/// table->shutdown()
///
/// So we stop producing all the parts first for all tables (fast operation). And after we can wait in shutdown()
/// for other replicas to download parts.
///
/// In flushAndPrepareForShutdown we cancel all part-producing operations:
/// merges, fetches, moves and so on. If it wasn't called before shutdown() -- shutdown() will
/// call it (defensive programming).
void flushAndPrepareForShutdown() override;
/// In shutdown we completely terminate table -- remove
/// is_active node and interserver handler. Also optionally
/// wait until other replicas will download some parts from our replica.
void shutdown() override;
~StorageReplicatedMergeTree() override;
static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config);
@ -340,6 +367,13 @@ public:
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
void addLastSentPart(const MergeTreePartInfo & info);
/// Wait required amount of milliseconds to give other replicas a chance to
/// download unique parts from our replica
using ShutdownDeadline = std::chrono::time_point<std::chrono::system_clock>;
void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline);
private:
std::atomic_bool are_restoring_replica {false};
@ -444,9 +478,19 @@ private:
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
std::atomic<bool> shutdown_prepared_called {false};
std::optional<ShutdownDeadline> shutdown_deadline;
/// We call flushAndPrepareForShutdown before acquiring DDLGuard, so we can shutdown a table that is being created right now
mutable std::mutex flush_and_shutdown_mutex;
mutable std::mutex last_sent_parts_mutex;
std::condition_variable last_sent_parts_cv;
std::deque<MergeTreePartInfo> last_sent_parts;
/// Threads.
///
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
@ -729,6 +773,7 @@ private:
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
static std::set<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.

View File

@ -79,11 +79,11 @@ public:
nested->shutdown();
}
void flush() override
void flushAndPrepareForShutdown() override
{
std::lock_guard lock{nested_mutex};
if (nested)
nested->flush();
nested->flushAndPrepareForShutdown();
}
void drop() override

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<wait_for_unique_parts_send_before_shutdown_ms>3000</wait_for_unique_parts_send_before_shutdown_ms>
</merge_tree>
</clickhouse>

View File

@ -58,6 +58,7 @@ ln -sf $SRC_PATH/config.d/display_name.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/reverse_dns_query_function.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/compressed_marks_and_index.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/disable_s3_env_credentials.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_wait_for_shutdown_replicated_tables.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/backups.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.

View File

@ -69,6 +69,8 @@
"test_server_reload/test.py::test_remove_tcp_port",
"test_keeper_map/test.py::test_keeper_map_without_zk",
"test_replicated_merge_tree_wait_on_shutdown/test.py::test_shutdown_and_wait",
"test_http_failover/test.py::test_url_destination_host_with_multiple_addrs",
"test_http_failover/test.py::test_url_invalid_hostname",

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<wait_for_unique_parts_send_before_shutdown_ms>30000</wait_for_unique_parts_send_before_shutdown_ms>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from multiprocessing.dummy import Pool
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["config/merge_tree_conf.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2",
main_configs=["config/merge_tree_conf.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_shutdown_and_wait(start_cluster):
for i, node in enumerate([node1, node2]):
node.query(
f"CREATE TABLE test_table (value UInt64) ENGINE=ReplicatedMergeTree('/test/table', 'r{i}') ORDER BY tuple()"
)
node1.query("INSERT INTO test_table VALUES (0)")
node2.query("SYSTEM SYNC REPLICA test_table")
assert node1.query("SELECT * FROM test_table") == "0\n"
assert node2.query("SELECT * FROM test_table") == "0\n"
def soft_shutdown(node):
node.stop_clickhouse(kill=False, stop_wait_sec=60)
p = Pool(50)
def insert(value):
node1.query(f"INSERT INTO test_table VALUES ({value})")
with PartitionManager() as pm:
pm.partition_instances(node1, node2)
p.map(insert, range(1, 50))
# Start shutdown async
waiter = p.apply_async(soft_shutdown, (node1,))
# to be sure that shutdown started
time.sleep(5)
# node 2 partitioned and don't see any data
assert node2.query("SELECT * FROM test_table") == "0\n"
# Restore network
pm.heal_all()
# wait for shutdown to finish
waiter.get()
node2.query("SYSTEM SYNC REPLICA test_table", timeout=5)
# check second replica has all data
assert node2.query("SELECT sum(value) FROM test_table") == "1225\n"
# and nothing in queue
assert node2.query("SELECT count() FROM system.replication_queue") == "0\n"
# It can happend that the second replica is superfast
assert node1.contains_in_log(
"Successfully waited all the parts"
) or node1.contains_in_log("All parts found on replica")

View File

@ -56,7 +56,7 @@ function create_table()
if [ -z "$database" ]; then continue; fi
$CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=0 -q \
"create table $database.rmt_${RANDOM}_${RANDOM}_${RANDOM} (n int) engine=ReplicatedMergeTree order by tuple() -- suppress $CLICKHOUSE_TEST_ZOOKEEPER_PREFIX" \
2>&1| grep -Fa "Exception: " | grep -Fv "Macro 'uuid' and empty arguments" | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE
2>&1| grep -Fa "Exception: " | grep -Fv "Macro 'uuid' and empty arguments" | grep -Fv "Cannot enqueue query" | grep -Fv "ZooKeeper session expired" | grep -Fv UNKNOWN_DATABASE | grep -Fv TABLE_IS_DROPPED
sleep 0.$RANDOM
done
}