Make shutdown of replicated tables softer

This commit is contained in:
alesapin 2023-07-05 18:11:25 +02:00
parent 1ea5261012
commit baee73fd96
19 changed files with 253 additions and 81 deletions

View File

@ -1286,7 +1286,7 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers);
updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
}
global_context->updateStorageConfiguration(*config);
@ -1388,10 +1388,15 @@ try
}
for (auto & server : servers_to_start_before_tables)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
std::lock_guard lock(servers_lock);
createInterserverServers(config(), interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ false);
for (auto & server : servers_to_start_before_tables)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
}
}
/// Initialize access storages.
@ -1688,7 +1693,7 @@ try
{
std::lock_guard lock(servers_lock);
createServers(config(), listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers);
createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' "
@ -1954,7 +1959,6 @@ HTTPContextPtr Server::httpContext() const
void Server::createServers(
Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
@ -2176,6 +2180,23 @@ void Server::createServers(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
});
}
}
void Server::createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers)
{
const Settings & settings = global_context->getSettingsRef();
Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
/// Now iterate over interserver_listen_hosts
for (const auto & interserver_listen_host : interserver_listen_hosts)
@ -2224,14 +2245,14 @@ void Server::createServers(
#endif
});
}
}
void Server::updateServers(
Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers)
std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables)
{
Poco::Logger * log = &logger();
@ -2256,12 +2277,18 @@ void Server::updateServers(
std::erase_if(servers, std::bind_front(check_server, " (from one of previous reload)"));
Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config();
std::vector<ProtocolServerAdapter *> all_servers;
for (auto & server : servers)
all_servers.push_back(&server);
for (auto & server : servers_to_start_before_tables)
all_servers.push_back(&server);
for (auto * server : all_servers)
{
if (!server.isStopping())
if (!server->isStopping())
{
std::string port_name = server.getPortName();
std::string port_name = server->getPortName();
bool has_host = false;
bool is_http = false;
if (port_name.starts_with("protocols."))
@ -2299,25 +2326,26 @@ void Server::updateServers(
/// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications.
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
is_http = server->getPortName() == "http_port" || server->getPortName() == "https_port";
}
if (!has_host)
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server->getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(port_name, "").empty();
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
if (force_restart)
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server.getDescription());
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server->getDescription());
if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber() || force_restart)
if (!has_host || !has_port || config.getInt(server->getPortName()) != server->portNumber() || force_restart)
{
server.stop();
LOG_INFO(log, "Stopped listening for {}", server.getDescription());
server->stop();
LOG_INFO(log, "Stopped listening for {}", server->getDescription());
}
}
}
createServers(config, listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
createInterserverServers(config, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
std::erase_if(servers, std::bind_front(check_server, ""));
}

View File

@ -102,6 +102,14 @@ private:
void createServers(
Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers = false);
void createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
@ -113,7 +121,8 @@ private:
Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers);
std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables);
};
}

View File

@ -292,7 +292,7 @@ void DatabaseWithOwnTablesBase::shutdown()
for (const auto & kv : tables_snapshot)
{
kv.second->flush();
kv.second->flushAndPrepareForShutdown();
}
for (const auto & kv : tables_snapshot)

View File

@ -361,7 +361,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
std::vector<std::pair<String, bool>> tables_to_drop;
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
iterator->table()->flush();
iterator->table()->flushAndPrepareForShutdown();
tables_to_drop.push_back({iterator->name(), iterator->table()->isDictionary()});
}

View File

@ -549,15 +549,15 @@ public:
/**
* If the storage requires some complicated work on destroying,
* then you have two virtual methods:
* - flush()
* - flushAndPrepareForShutdown()
* - shutdown()
*
* @see shutdown()
* @see flush()
* @see flushAndPrepareForShutdown()
*/
void flushAndShutdown()
{
flush();
flushAndPrepareForShutdown();
shutdown();
}
@ -570,7 +570,7 @@ public:
/// Called before shutdown() to flush data to underlying storage
/// Data in memory need to be persistent
virtual void flush() {}
virtual void flushAndPrepareForShutdown() {}
/// Asks table to stop executing some action identified by action_type
/// If table does not support such type of lock, and empty lock is returned

View File

@ -204,7 +204,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);
data.addLastSentPart(part->name);
data.addLastSentPart(part->info);
}
catch (const NetException &)
{

View File

@ -119,6 +119,7 @@ struct Settings;
M(Bool, detach_not_byte_identical_parts, false, "Do not remove non byte-idential parts for ReplicatedMergeTree, instead detach them (maybe useful for further analysis).", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(Milliseconds, wait_for_unique_parts_send_before_shutdown_ms, 0, "Before shutdown table will wait for required amount time for unique parts (exist only on current replica) to be fetched by other replicas (0 means disabled).", 0) \
\
/** Check delay of replicas settings. */ \
M(UInt64, min_relative_delay_to_measure, 120, "Calculate relative replica delay only if absolute delay is not less that this value.", 0) \

View File

@ -330,7 +330,7 @@ void ReplicatedMergeTreeRestartingThread::activateReplica()
void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shutdown)
{
setReadonly(part_of_full_shutdown);
storage.partialShutdown();
storage.partialShutdown(part_of_full_shutdown);
}

View File

@ -682,7 +682,7 @@ void StorageBuffer::startup()
}
void StorageBuffer::flush()
void StorageBuffer::flushAndPrepareForShutdown()
{
if (!flush_handle)
return;

View File

@ -92,7 +92,7 @@ public:
void startup() override;
/// Flush all buffers into the subordinate table and stop background thread.
void flush() override;
void flushAndPrepareForShutdown() override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -1427,7 +1427,7 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::flush()
void StorageDistributed::flushAndPrepareForShutdown()
{
try
{

View File

@ -135,7 +135,7 @@ public:
void initializeFromDisk();
void shutdown() override;
void flush() override;
void flushAndPrepareForShutdown() override;
void drop() override;
bool storesDataOnDisk() const override { return data_volume != nullptr; }

View File

@ -139,7 +139,7 @@ public:
void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); }
void flush() override { getNested()->flush(); }
void flushAndPrepareForShutdown() override { getNested()->flushAndPrepareForShutdown(); }
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }

View File

@ -2,6 +2,7 @@
#include <cstddef>
#include <ranges>
#include <chrono>
#include <base/hex.h>
#include <base/interpolate.h>
@ -3933,6 +3934,7 @@ void StorageReplicatedMergeTree::addLastSentPart(const MergeTreePartInfo & info)
{
std::lock_guard lock(last_sent_parts_mutex);
last_sent_parts.emplace_back(info);
static constexpr size_t LAST_SENT_PARS_WINDOW_SIZE = 1000;
while (last_sent_parts.size() > LAST_SENT_PARS_WINDOW_SIZE)
last_sent_parts.pop_front();
}
@ -3950,24 +3952,32 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(si
auto zookeeper = getZooKeeper();
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper);
auto unique_parts_set = findReplicaUniqueParts(replica_name, zookeeper_path, format_version, zookeeper, log);
if (unique_parts_set.empty())
{
LOG_INFO(log, "Will not wait for unique parts to be fetched because we don't have any unique parts");
return;
}
else
{
LOG_INFO(log, "Will wait for {} unique parts to be fetched", unique_parts_set.size());
}
auto wait_predicate = [&] () -> void
auto wait_predicate = [&] () -> bool
{
bool all_fetched = true;
for (const auto & part : unique_parts_set)
for (auto it = unique_parts_set.begin(); it != unique_parts_set.end();)
{
const auto & part = *it;
bool found = false;
for (const auto & sent_part : last_sent_parts)
for (const auto & sent_part : last_sent_parts | std::views::reverse)
{
if (sent_part.contains(part))
{
LOG_TRACE(log, "Part {} was fetched by some replica", part.getPartNameForLogs());
found = true;
it = unique_parts_set.erase(it);
break;
}
}
@ -3981,14 +3991,19 @@ void StorageReplicatedMergeTree::waitForUniquePartsToBeFetchedByOtherReplicas(si
};
std::unique_lock lock(last_sent_parts_mutex);
if (!last_sent_parts_cv.wait_for(last_sent_parts_cv, std::chrono::duration_cast<std::chrono::milliseconds>(wait_ms), wait_predicate))
LOG_WARNING(log, "Failed to wait for unqiue parts to be fetched in {} ms, {} parts can be left on this replica", wait_ms, unqiue_parts_set.size());
if (!last_sent_parts_cv.wait_for(lock, std::chrono::milliseconds(wait_ms), wait_predicate))
LOG_WARNING(log, "Failed to wait for unqiue parts to be fetched in {} ms, {} parts can be left on this replica", wait_ms, unique_parts_set.size());
else
LOG_INFO(log, "Successfuly waited all the parts");
}
std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_)
std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_)
{
if (zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica_name_ / "is_active"))
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica_name_ / "is_active"))
{
LOG_INFO(log_, "Our replica is not active, nobody will try to fetch anything");
return {};
}
Strings replicas = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas");
Strings our_parts;
@ -3996,40 +4011,54 @@ std::vector<MergeTreePartInfo> StorageReplicatedMergeTree::findReplicaUniquePart
for (const String & replica : replicas)
{
if (!zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active"))
{
LOG_TRACE(log_, "Replica {} is not active, skipping", replica);
continue;
}
Strings parts = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas" / replica / "parts");
if (replica == replica_name_)
{
LOG_TRACE(log_, "Our replica parts collected {}", replica);
our_parts = parts;
}
else
{
LOG_TRACE(log_, "Fetching parts for replica {}", replica);
data_parts_on_replicas.emplace_back(format_version_);
for (const auto & part : parts)
{
if (!data_parts_on_replicas.back().getContainingPart(part).empty())
if (data_parts_on_replicas.back().getContainingPart(part).empty())
data_parts_on_replicas.back().add(part);
}
}
}
NameSet our_unique_parts;
std::vector<MergeTreePartInfo> our_unique_parts;
for (const auto & part : our_parts)
{
LOG_TRACE(log_, "Looking for part {}", part);
bool found = false;
for (const auto & active_parts_set : data_parts_on_replicas)
{
if (!active_parts_set.getContainingPart(part).empty())
{
LOG_TRACE(log_, "Part {} found", part);
found = true;
break;
}
}
if (!found)
our_unique_parts.insert(MergeTreePartInfo::fromPartName(part, format_version));
{
LOG_TRACE(log_, "Part not {} found", part);
our_unique_parts.emplace_back(MergeTreePartInfo::fromPartName(part, format_version_));
}
}
if (!our_parts.empty() && our_unique_parts.empty())
LOG_TRACE(log_, "All parts found on replica");
return our_unique_parts;
}
@ -4799,39 +4828,9 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)
}
void StorageReplicatedMergeTree::partialShutdown()
void StorageReplicatedMergeTree::flushAndPrepareForShutdown()
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
partial_shutdown_called = true;
partial_shutdown_event.set();
queue.notifySubscribersOnPartialShutdown();
replica_is_active_node = nullptr;
LOG_TRACE(log, "Waiting for threads to finish");
merge_selecting_task->deactivate();
queue_updating_task->deactivate();
mutations_updating_task->deactivate();
mutations_finalizing_task->deactivate();
cleanup_thread.stop();
async_block_ids_cache.stop();
part_check_thread.stop();
/// Stop queue processing
{
auto fetch_lock = fetcher.blocker.cancel();
auto merge_lock = merger_mutator.merges_blocker.cancel();
auto move_lock = parts_mover.moves_blocker.cancel();
background_operations_assignee.finish();
}
LOG_TRACE(log, "Threads finished");
}
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called.exchange(true))
if (shutdown_prepared_called.exchange(true))
return;
session_expired_callback_handler.reset();
@ -4860,6 +4859,58 @@ void StorageReplicatedMergeTree::shutdown()
}
background_moves_assignee.finish();
}
void StorageReplicatedMergeTree::partialShutdown(bool part_of_full_shutdown)
{
ProfileEvents::increment(ProfileEvents::ReplicaPartialShutdown);
partial_shutdown_called = true;
partial_shutdown_event.set();
queue.notifySubscribersOnPartialShutdown();
if (!part_of_full_shutdown)
{
LOG_DEBUG(log, "Reset active node, replica will be inactive");
replica_is_active_node = nullptr;
}
else
LOG_DEBUG(log, "Will not reset active node, it will be reset completely during full shutdown");
LOG_TRACE(log, "Waiting for threads to finish");
merge_selecting_task->deactivate();
queue_updating_task->deactivate();
mutations_updating_task->deactivate();
mutations_finalizing_task->deactivate();
cleanup_thread.stop();
async_block_ids_cache.stop();
part_check_thread.stop();
/// Stop queue processing
{
auto fetch_lock = fetcher.blocker.cancel();
auto merge_lock = merger_mutator.merges_blocker.cancel();
auto move_lock = parts_mover.moves_blocker.cancel();
background_operations_assignee.finish();
}
LOG_TRACE(log, "Threads finished");
}
void StorageReplicatedMergeTree::shutdown()
{
if (shutdown_called.exchange(true))
return;
if (!shutdown_prepared_called.load())
flushAndPrepareForShutdown();
auto settings_ptr = getSettings();
LOG_DEBUG(log, "Data parts exchange still exists {}", data_parts_exchange_endpoint != nullptr);
waitForUniquePartsToBeFetchedByOtherReplicas(settings_ptr->wait_for_unique_parts_send_before_shutdown_ms.totalMilliseconds());
replica_is_active_node = nullptr;
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
if (data_parts_exchange_ptr)
{

View File

@ -113,7 +113,10 @@ public:
void startup() override;
void shutdown() override;
void partialShutdown();
void flushAndPrepareForShutdown() override;
void partialShutdown(bool part_of_full_shutdown);
~StorageReplicatedMergeTree() override;
static String getDefaultZooKeeperPath(const Poco::Util::AbstractConfiguration & config);
@ -453,9 +456,9 @@ private:
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
std::atomic<bool> shutdown_called {false};
std::atomic<bool> shutdown_prepared_called {false};
static constexpr size_t LAST_SENT_PARS_WINDOW_SIZE = 1000;
std::mutex last_sent_parts_mutex;
mutable std::mutex last_sent_parts_mutex;
std::condition_variable last_sent_parts_cv;
std::deque<MergeTreePartInfo> last_sent_parts;
@ -711,7 +714,7 @@ private:
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
String findReplicaHavingCoveringPart(const String & part_name, bool active, String & found_part_name);
static std::vector<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_);
static std::vector<MergeTreePartInfo> findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, Poco::Logger * log_);
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.

View File

@ -79,11 +79,11 @@ public:
nested->shutdown();
}
void flush() override
void flushAndPrepareForShutdown() override
{
std::lock_guard lock{nested_mutex};
if (nested)
nested->flush();
nested->flushAndPrepareForShutdown();
}
void drop() override

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<wait_for_unique_parts_send_before_shutdown_ms>30000</wait_for_unique_parts_send_before_shutdown_ms>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from multiprocessing.dummy import Pool
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1", main_configs=["config/merge_tree_conf.xml"], with_zookeeper=True, stay_alive=True
)
node2 = cluster.add_instance(
"node2", main_configs=["config/merge_tree_conf.xml"], with_zookeeper=True, stay_alive=True
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_shutdown_and_wait(start_cluster):
for i, node in enumerate([node1, node2]):
node.query(f"CREATE TABLE test_table (value UInt64) ENGINE=ReplicatedMergeTree('/test/table', 'r{i}') ORDER BY tuple()")
node1.query("INSERT INTO test_table VALUES (0)")
node2.query("SYSTEM SYNC REPLICA test_table")
assert node1.query("SELECT * FROM test_table") == "0\n"
assert node2.query("SELECT * FROM test_table") == "0\n"
def soft_shutdown(node):
node.stop_clickhouse(kill=False, stop_wait_sec=60)
p = Pool(50)
pm = PartitionManager()
pm.partition_instances(node1, node2)
def insert(value):
node1.query(f"INSERT INTO test_table VALUES ({value})")
p.map(insert, range(1, 50))
# Start shutdown async
waiter = p.apply_async(soft_shutdown, (node1,))
# to be sure that shutdown started
time.sleep(5)
# node 2 partitioned and don't see any data
assert node2.query("SELECT * FROM test_table") == "0\n"
# Restore network
pm.heal_all()
# wait for shutdown to finish
waiter.get()
node2.query("SYSTEM SYNC REPLICA test_table", timeout=5)
# check second replica has all data
assert node2.query("SELECT sum(value) FROM test_table") == "1225\n"
# and nothing in queue
assert node2.query("SELECT count() FROM system.replication_queue") == "0\n"
# It can happend that the second replica is superfast
assert node1.contains_in_log("Successfuly waited all the parts") or node1.contains_in_log("All parts found on replica")