Merge pull request #21079 from azat/distributed_ddl_pool_size-zk-fix

Fix various issues in DDLWorker (SIGSEGV and others)
This commit is contained in:
tavplubix 2021-03-02 11:59:30 +03:00 committed by GitHub
commit d305b23338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 249 additions and 100 deletions

View File

@ -1017,17 +1017,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they require PHDR cache to be created"
" (otherwise the function 'dl_iterate_phdr' is not lock free and not async-signal safe).");
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(),
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
}
std::unique_ptr<DNSCacheUpdater> dns_cache_updater;
if (config().has("disable_internal_dns_cache") && config().getInt("disable_internal_dns_cache"))
{
@ -1309,6 +1298,37 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::thread::hardware_concurrency());
}
/// try to load dictionaries immediately, throw on error and die
ext::scope_guard dictionaries_xmls, models_xmls;
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
if (has_zookeeper && config().has("distributed_ddl"))
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
int pool_size = config().getInt("distributed_ddl.pool_size", 1);
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, *global_context, &config(),
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
}
LOG_INFO(log, "Ready for connections.");
SCOPE_EXIT({
@ -1358,26 +1378,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
});
/// try to load dictionaries immediately, throw on error and die
ext::scope_guard dictionaries_xmls, models_xmls;
try
{
if (!config().getBool("dictionaries_lazy_load", true))
{
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
dictionaries_xmls = global_context->getExternalDictionariesLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config"));
models_xmls = global_context->getExternalModelsLoader().addConfigRepository(
std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "models_config"));
}
catch (...)
{
LOG_ERROR(log, "Caught exception while loading dictionaries.");
throw;
}
std::vector<std::unique_ptr<MetricsTransmitter>> metrics_transmitters;
for (const auto & graphite_key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{

View File

@ -892,6 +892,19 @@
<!-- Controls how much ON CLUSTER queries can be run simultaneously. -->
<!-- <pool_size>1</pool_size> -->
<!--
Cleanup settings (active tasks will not be removed)
-->
<!-- Controls task TTL (default 1 week) -->
<!-- <task_max_lifetime>604800</task_max_lifetime> -->
<!-- Controls how often cleanup should be performed (in seconds) -->
<!-- <cleanup_delay_period>60</cleanup_delay_period> -->
<!-- Controls how many tasks could be in the queue -->
<!-- <max_tasks_in_queue>1000</max_tasks_in_queue> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->

View File

@ -22,7 +22,7 @@ DatabaseReplicatedDDLWorker::DatabaseReplicatedDDLWorker(DatabaseReplicated * db
/// We also need similar graph to load tables on server startup in order of topsort.
}
void DatabaseReplicatedDDLWorker::initializeMainThread()
bool DatabaseReplicatedDDLWorker::initializeMainThread()
{
while (!stop_flag)
{
@ -33,7 +33,7 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
database->tryConnectToZooKeeperAndInitDatabase(false);
initializeReplication();
initialized = true;
return;
return true;
}
catch (...)
{
@ -41,6 +41,8 @@ void DatabaseReplicatedDDLWorker::initializeMainThread()
sleepForSeconds(5);
}
}
return false;
}
void DatabaseReplicatedDDLWorker::shutdown()
@ -61,7 +63,7 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr)
database->recoverLostReplica(current_zookeeper, our_log_ptr, max_log_ptr);
else
last_skipped_entry_name.emplace(log_ptr_str);
last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr));
}
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)

View File

@ -30,7 +30,7 @@ public:
void shutdown() override;
private:
void initializeMainThread() override;
bool initializeMainThread() override;
void initializeReplication();
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;

View File

@ -189,7 +189,7 @@ public:
void commit();
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exception()); }
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
};
}

View File

@ -305,20 +305,26 @@ static void filterAndSortQueueNodes(Strings & all_nodes)
std::sort(all_nodes.begin(), all_nodes.end());
}
void DDLWorker::scheduleTasks()
void DDLWorker::scheduleTasks(bool reinitialized)
{
LOG_DEBUG(log, "Scheduling tasks");
auto zookeeper = tryGetZooKeeper();
for (auto & task : current_tasks)
/// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
/// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status.
if (reinitialized)
{
/// Main thread of DDLWorker was restarted, probably due to lost connection with ZooKeeper.
/// We have some unfinished tasks. To avoid duplication of some queries, try to write execution status.
bool task_still_exists = zookeeper->exists(task->entry_path);
bool status_written = zookeeper->exists(task->getFinishedNodePath());
if (task->was_executed && !status_written && task_still_exists)
for (auto & task : current_tasks)
{
processTask(*task, zookeeper);
if (task->was_executed)
{
bool task_still_exists = zookeeper->exists(task->entry_path);
bool status_written = zookeeper->exists(task->getFinishedNodePath());
if (!status_written && task_still_exists)
{
processTask(*task, zookeeper);
}
}
}
}
@ -332,19 +338,23 @@ void DDLWorker::scheduleTasks()
else if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();
bool server_startup = current_tasks.empty();
/// Detect queue start, using:
/// - skipped tasks
/// - in memory tasks (that are currently active)
auto begin_node = queue_nodes.begin();
if (!server_startup)
UInt64 last_task_id = 0;
if (!current_tasks.empty())
{
/// We will recheck status of last executed tasks. It's useful if main thread was just restarted.
auto & min_task = *std::min_element(current_tasks.begin(), current_tasks.end());
String min_entry_name = last_skipped_entry_name ? std::min(min_task->entry_name, *last_skipped_entry_name) : min_task->entry_name;
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), min_entry_name);
current_tasks.clear();
auto & last_task = current_tasks.back();
last_task_id = DDLTaskBase::getLogEntryNumber(last_task->entry_name);
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), last_task->entry_name);
}
if (last_skipped_entry_name)
{
UInt64 last_skipped_entry_id = DDLTaskBase::getLogEntryNumber(*last_skipped_entry_name);
if (last_skipped_entry_id > last_task_id)
begin_node = std::upper_bound(queue_nodes.begin(), queue_nodes.end(), *last_skipped_entry_name);
}
assert(current_tasks.empty());
for (auto it = begin_node; it != queue_nodes.end() && !stop_flag; ++it)
{
@ -365,7 +375,7 @@ void DDLWorker::scheduleTasks()
if (worker_pool)
{
worker_pool->scheduleOrThrowOnError([this, &saved_task, &zookeeper]()
worker_pool->scheduleOrThrowOnError([this, &saved_task, zookeeper]()
{
setThreadName("DDLWorkerExec");
processTask(saved_task, zookeeper);
@ -930,11 +940,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
}
void DDLWorker::initializeMainThread()
bool DDLWorker::initializeMainThread()
{
assert(!initialized);
setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread");
LOG_DEBUG(log, "Initializing DDLWorker thread");
while (!stop_flag)
{
@ -943,7 +953,7 @@ void DDLWorker::initializeMainThread()
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initialized = true;
return;
return true;
}
catch (const Coordination::Exception & e)
{
@ -964,6 +974,8 @@ void DDLWorker::initializeMainThread()
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds(5);
}
return false;
}
void DDLWorker::runMainThread()
@ -989,15 +1001,19 @@ void DDLWorker::runMainThread()
{
try
{
bool reinitialized = !initialized;
/// Reinitialize DDLWorker state (including ZooKeeper connection) if required
if (!initialized)
{
initializeMainThread();
/// Stopped
if (!initializeMainThread())
break;
LOG_DEBUG(log, "Initialized DDLWorker thread");
}
cleanup_event->set();
scheduleTasks();
scheduleTasks(reinitialized);
LOG_DEBUG(log, "Waiting for queue updates");
queue_updated_event->wait();
@ -1007,6 +1023,9 @@ void DDLWorker::runMainThread()
if (Coordination::isHardwareError(e.code))
{
initialized = false;
/// Wait for pending async tasks
if (1 < pool_size)
worker_pool = std::make_unique<ThreadPool>(pool_size);
LOG_INFO(log, "Lost ZooKeeper connection, will try to connect again: {}", getCurrentExceptionMessage(true));
}
else

View File

@ -69,7 +69,7 @@ protected:
ZooKeeperPtr getAndSetZooKeeper();
/// Iterates through queue tasks in ZooKeeper, runs execution of new tasks
void scheduleTasks();
void scheduleTasks(bool reinitialized);
DDLTaskBase & saveTask(DDLTaskPtr && task);
@ -104,7 +104,8 @@ protected:
/// Init task node
void createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper);
virtual void initializeMainThread();
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
void runMainThread();
void runCleanupThread();

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">2</pool_size>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">20</pool_size>
</distributed_ddl>
</yandex>

View File

@ -1,26 +1,50 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>slow_dict</name>
<source>
<executable>
<command>sleep 7</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>slow_dict_7</name>
<source>
<executable>
<command>sleep 7</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>slow_dict_3</name>
<source>
<executable>
<command>sleep 3</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>

View File

@ -1,6 +1,6 @@
<yandex>
<remote_servers>
<cluster>
<cluster_a>
<shard>
<replica>
<host>n1</host>
@ -13,6 +13,20 @@
<port>9000</port>
</replica>
</shard>
</cluster>
</cluster_a>
<cluster_b>
<shard>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</cluster_b>
</remote_servers>
</yandex>

View File

@ -10,11 +10,31 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
def add_instance(name):
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
def add_instance(name, ddl_config=None):
main_configs=[
'configs/ddl.xml',
'configs/remote_servers.xml',
]
if ddl_config:
main_configs.append(ddl_config)
dictionaries=[
'configs/dict.xml',
]
@ -24,8 +44,12 @@ def add_instance(name):
with_zookeeper=True)
initiator = add_instance('initiator')
n1 = add_instance('n1')
n2 = add_instance('n2')
# distributed_ddl.pool_size = 2
n1 = add_instance('n1', 'configs/ddl_a.xml')
n2 = add_instance('n2', 'configs/ddl_a.xml')
# distributed_ddl.pool_size = 20
n3 = add_instance('n3', 'configs/ddl_b.xml')
n4 = add_instance('n4', 'configs/ddl_b.xml')
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
@ -49,17 +73,32 @@ def longer_then(sec):
return inner
return wrapper
# It takes 7 seconds to load slow_dict.
def thread_reload_dictionary():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster slow_dict')
# It takes 7 seconds to load slow_dict_7.
def execute_reload_dictionary_slow_dict_7():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_reload_dictionary_slow_dict_3():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_smoke_query():
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={
'distributed_ddl_task_timeout': 60,
})
def check_log():
# ensure that none of tasks processed multiple times
for _, instance in list(cluster.instances.items()):
assert not instance.contains_in_log('Coordination::Exception: Node exists')
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
def test_dict_load():
def test_slow_dict_load_7():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict')
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7')
inner_test()
def test_all_in_parallel():
@ -68,12 +107,13 @@ def test_all_in_parallel():
def inner_test():
threads = []
for _ in range(2):
threads.append(threading.Thread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
thread.join(70)
inner_test()
check_log()
def test_two_in_parallel_two_queued():
@pytest.mark.timeout(19)
@ -81,9 +121,35 @@ def test_two_in_parallel_two_queued():
def inner_test():
threads = []
for _ in range(4):
threads.append(threading.Thread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
thread.join(70)
inner_test()
check_log()
def test_smoke():
for _ in range(100):
execute_smoke_query()
check_log()
def test_smoke_parallel():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()