This commit is contained in:
Alexey Milovidov 2014-12-30 06:04:15 +03:00
commit bcf17501cd

View File

@ -13,6 +13,9 @@ namespace DB
namespace namespace
{ {
static constexpr const std::chrono::seconds max_sleep_time{30};
static constexpr const std::chrono::minutes decrease_error_count_period{5};
template <typename PoolFactory> template <typename PoolFactory>
ConnectionPools createPoolsForAddresses(const std::string & name, PoolFactory && factory) ConnectionPools createPoolsForAddresses(const std::string & name, PoolFactory && factory)
{ {
@ -57,7 +60,8 @@ class StorageDistributed::DirectoryMonitor
public: public:
DirectoryMonitor(StorageDistributed & storage, const std::string & name) DirectoryMonitor(StorageDistributed & storage, const std::string & name)
: storage(storage), pool{createPool(name)}, path{storage.path + name + '/'} : storage(storage), pool{createPool(name)}, path{storage.path + name + '/'}
, sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} , default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, log{&Logger::get(getLoggerName())} , log{&Logger::get(getLoggerName())}
{ {
} }
@ -90,11 +94,22 @@ private:
catch (...) catch (...)
{ {
do_sleep = true; do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{std::int64_t(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{max_sleep_time});
tryLogCurrentException(getLoggerName().data()); tryLogCurrentException(getLoggerName().data());
} };
if (do_sleep) if (do_sleep)
cond.wait_for(lock, sleep_time, quit_requested); cond.wait_for(lock, sleep_time, quit_requested);
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
error_count /= 2;
last_decrease_time = now;
}
} }
} }
@ -196,7 +211,12 @@ private:
StorageDistributed & storage; StorageDistributed & storage;
ConnectionPoolPtr pool; ConnectionPoolPtr pool;
std::string path; std::string path;
std::size_t error_count{};
std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time; std::chrono::milliseconds sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time{
std::chrono::system_clock::now()
};
bool quit{false}; bool quit{false};
std::mutex mutex; std::mutex mutex;
std::condition_variable cond; std::condition_variable cond;