try fix it better way

This commit is contained in:
Alexander Tokmakov 2020-12-30 15:25:00 +03:00
parent f90719fef8
commit f2fca15393
3 changed files with 40 additions and 14 deletions

View File

@ -65,6 +65,7 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
Coordination::ZooKeeper::Nodes nodes;
nodes.reserve(hosts_strings.size());
bool dns_error = false;
for (auto & host_string : hosts_strings)
{
try
@ -76,14 +77,27 @@ void ZooKeeper::init(const std::string & implementation_, const std::string & ho
nodes.emplace_back(Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{host_string}, secure});
}
catch (const Poco::Net::HostNotFoundException & e)
{
/// Most likely it's misconfiguration and wrong hostname was specified
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText());
}
catch (const Poco::Net::DNSException & e)
{
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText());
/// Most likely DNS is not available now
dns_error = true;
LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", host_string, e.displayText());
}
}
if (nodes.empty())
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw KeeperException("Cannot resolve any of provided ZooKeeper hosts due to DNS error", Coordination::Error::ZCONNECTIONLOSS);
else
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
}
impl = std::make_unique<Coordination::ZooKeeper>(
nodes,

View File

@ -315,7 +315,7 @@ DDLWorker::DDLWorker(int pool_size_, const std::string & zk_root_dir, Context &
: context(context_)
, log(&Poco::Logger::get("DDLWorker"))
, pool_size(pool_size_)
, worker_pool(pool_size_)
, worker_pool(std::make_unique<ThreadPool>(pool_size))
{
CurrentMetrics::set(CurrentMetrics::MaxDDLEntryID, 0);
last_tasks.reserve(pool_size);
@ -352,7 +352,7 @@ DDLWorker::~DDLWorker()
stop_flag = true;
queue_updated_event->set();
cleanup_event->set();
worker_pool.wait();
worker_pool.reset();
main_thread.join();
cleanup_thread.join();
}
@ -517,7 +517,7 @@ void DDLWorker::scheduleTasks()
if (!already_processed)
{
worker_pool.scheduleOrThrowOnError([this, task_ptr = task.release()]()
worker_pool->scheduleOrThrowOnError([this, task_ptr = task.release()]()
{
setThreadName("DDLWorkerExec");
enqueueTask(DDLTaskPtr(task_ptr));
@ -1131,6 +1131,17 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
void DDLWorker::runMainThread()
{
auto reset_state = [&](bool reset_pool = true)
{
/// It will wait for all threads in pool to finish and will not rethrow exceptions (if any).
/// We create new thread pool to forget previous exceptions.
if (reset_pool)
worker_pool = std::make_unique<ThreadPool>(pool_size);
/// Clear other in-memory state, like server just started.
last_tasks.clear();
max_id = 0;
};
setThreadName("DDLWorker");
LOG_DEBUG(log, "Started DDLWorker thread");
@ -1148,8 +1159,9 @@ void DDLWorker::runMainThread()
if (!Coordination::isHardwareError(e.code))
{
/// A logical error.
LOG_FATAL(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true));
abort();
LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true));
reset_state(false);
assert(false); /// Catch such failures in tests with debug build
}
tryLogCurrentException(__PRETTY_FUNCTION__);
@ -1159,8 +1171,8 @@ void DDLWorker::runMainThread()
}
catch (...)
{
tryLogCurrentException(log, "Terminating. Cannot initialize DDL queue.");
return;
tryLogCurrentException(log, "Cannot initialize DDL queue.");
reset_state(false);
}
}
while (!initialized && !stop_flag);
@ -1189,14 +1201,14 @@ void DDLWorker::runMainThread()
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error: {}. Terminating.", getCurrentExceptionMessage(true));
return;
LOG_ERROR(log, "Unexpected ZooKeeper error: {}", getCurrentExceptionMessage(true));
reset_state();
}
}
catch (...)
{
tryLogCurrentException(log, "Unexpected error, will terminate:");
return;
tryLogCurrentException(log, "Unexpected error:");
reset_state();
}
}
}

View File

@ -127,7 +127,7 @@ private:
/// Size of the pool for query execution.
size_t pool_size = 1;
ThreadPool worker_pool;
std::unique_ptr<ThreadPool> worker_pool;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)