2015-10-05 01:26:43 +00:00
|
|
|
|
#pragma once
|
|
|
|
|
|
|
|
|
|
#include <time.h>
|
|
|
|
|
#include <DB/Common/PoolBase.h>
|
|
|
|
|
#include <DB/Common/ProfileEvents.h>
|
|
|
|
|
#include <DB/Common/NetException.h>
|
2015-10-05 01:35:28 +00:00
|
|
|
|
#include <DB/Common/Exception.h>
|
2015-10-05 01:26:43 +00:00
|
|
|
|
#include <DB/Core/ErrorCodes.h>
|
|
|
|
|
#include <DB/Interpreters/Settings.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace
|
|
|
|
|
{
|
|
|
|
|
/** Класс, который употребляется для того, чтобы оптимизировать выделение
|
|
|
|
|
* нескольких ресурсов в PoolWithFailoverBase. Проверки границ не проводятся,
|
|
|
|
|
* потому что мы предполагаем, что PoolWithFailoverBase делает все нужные
|
|
|
|
|
* проверки.
|
|
|
|
|
*/
|
|
|
|
|
class ResourceTracker
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
ResourceTracker(size_t s)
|
|
|
|
|
: handles(s), unallocated_size(s)
|
|
|
|
|
{
|
|
|
|
|
size_t i = 0;
|
|
|
|
|
for (auto & index : handles)
|
|
|
|
|
{
|
|
|
|
|
index = i;
|
|
|
|
|
++i;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t getHandle(size_t i) const
|
|
|
|
|
{
|
|
|
|
|
return handles[i];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t getUnallocatedSize() const
|
|
|
|
|
{
|
|
|
|
|
return unallocated_size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void markAsAllocated(size_t i)
|
|
|
|
|
{
|
|
|
|
|
std::swap(handles[i], handles[unallocated_size - 1]);
|
|
|
|
|
--unallocated_size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
std::vector<size_t> handles;
|
|
|
|
|
size_t unallocated_size;
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Класс, от которого можно унаследоваться и получить пул с отказоустойчивостью. Используется для пулов соединений с реплицированной БД.
|
|
|
|
|
* Инициализируется несколькими другими PoolBase-ами.
|
|
|
|
|
* При получении соединения, пытается создать или выбрать живое соединение из какого-нибудь пула,
|
|
|
|
|
* перебирая их в некотором порядке, используя не более указанного количества попыток.
|
|
|
|
|
* Пулы перебираются в порядке лексикографического возрастания тройки (приоритет, число ошибок, случайное число).
|
|
|
|
|
*
|
|
|
|
|
* Замечание: если один из вложенных пулов заблокируется из-за переполнения, то этот пул тоже заблокируется.
|
|
|
|
|
*
|
|
|
|
|
* Наследник должен предоставить метод, достающий соединение из вложенного пула.
|
|
|
|
|
* Еще наследник можнет назначать приоритеты вложенным пулам.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
template <typename TNestedPool>
|
|
|
|
|
class PoolWithFailoverBase : private boost::noncopyable
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
typedef TNestedPool NestedPool;
|
|
|
|
|
typedef Poco::SharedPtr<NestedPool> NestedPoolPtr;
|
|
|
|
|
typedef typename NestedPool::Entry Entry;
|
|
|
|
|
typedef std::vector<NestedPoolPtr> NestedPools;
|
|
|
|
|
|
|
|
|
|
virtual ~PoolWithFailoverBase() {}
|
|
|
|
|
|
|
|
|
|
PoolWithFailoverBase(NestedPools & nested_pools_,
|
|
|
|
|
size_t max_tries_,
|
|
|
|
|
time_t decrease_error_period_,
|
|
|
|
|
Logger * log_)
|
|
|
|
|
: nested_pools(nested_pools_.begin(), nested_pools_.end(), decrease_error_period_), max_tries(max_tries_),
|
|
|
|
|
log(log_)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Выделяет соединение для работы. */
|
|
|
|
|
Entry get(const DB::Settings * settings)
|
|
|
|
|
{
|
|
|
|
|
Entry entry;
|
|
|
|
|
std::stringstream fail_messages;
|
|
|
|
|
|
|
|
|
|
bool skip_unavailable = settings ? UInt64(settings->skip_unavailable_shards) : false;
|
|
|
|
|
|
|
|
|
|
if (getResource(entry, fail_messages, nullptr, settings))
|
|
|
|
|
return entry;
|
|
|
|
|
else if (!skip_unavailable)
|
|
|
|
|
throw DB::NetException("All connection tries failed. Log: \n\n" + fail_messages.str() + "\n", DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
|
|
|
|
else
|
|
|
|
|
return {};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Выделяет до указанного количества соединений для работы
|
|
|
|
|
* Соединения предоставляют доступ к разным репликам одного шарда.
|
|
|
|
|
*/
|
2015-10-12 14:53:16 +00:00
|
|
|
|
std::vector<Entry> getMany(const DB::Settings * settings, bool get_all)
|
2015-10-05 01:26:43 +00:00
|
|
|
|
{
|
|
|
|
|
ResourceTracker resource_tracker{nested_pools.size()};
|
|
|
|
|
|
2015-10-12 14:53:16 +00:00
|
|
|
|
UInt64 max_connections;
|
|
|
|
|
if (get_all)
|
|
|
|
|
max_connections = nested_pools.size();
|
|
|
|
|
else
|
|
|
|
|
max_connections = settings ? UInt64(settings->max_parallel_replicas) : 1;
|
|
|
|
|
|
2015-10-05 01:26:43 +00:00
|
|
|
|
bool skip_unavailable = settings ? UInt64(settings->skip_unavailable_shards) : false;
|
|
|
|
|
|
|
|
|
|
std::vector<Entry> connections;
|
|
|
|
|
connections.reserve(max_connections);
|
|
|
|
|
|
|
|
|
|
for (UInt64 i = 0; i < max_connections; ++i)
|
|
|
|
|
{
|
|
|
|
|
Entry entry;
|
|
|
|
|
std::stringstream fail_messages;
|
|
|
|
|
|
|
|
|
|
if (getResource(entry, fail_messages, &resource_tracker, settings))
|
|
|
|
|
connections.push_back(entry);
|
2015-10-12 14:53:16 +00:00
|
|
|
|
else if (get_all || ((i == 0) && !skip_unavailable))
|
2015-10-05 01:26:43 +00:00
|
|
|
|
throw DB::NetException("All connection tries failed. Log: \n\n" + fail_messages.str() + "\n", DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
|
|
|
|
|
else
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return connections;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
struct PoolWithErrorCount
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
PoolWithErrorCount(const NestedPoolPtr & pool_) : pool(pool_)
|
|
|
|
|
{
|
|
|
|
|
struct timespec times;
|
|
|
|
|
if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×))
|
|
|
|
|
DB::throwFromErrno("Cannot clock_gettime.", DB::ErrorCodes::CANNOT_CLOCK_GETTIME);
|
|
|
|
|
|
|
|
|
|
srand48_r(reinterpret_cast<intptr_t>(this) ^ times.tv_nsec, &rand_state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void randomize()
|
|
|
|
|
{
|
|
|
|
|
long int rand_res;
|
|
|
|
|
lrand48_r(&rand_state, &rand_res);
|
|
|
|
|
state.random = rand_res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
struct State
|
|
|
|
|
{
|
|
|
|
|
static bool compare(const State & lhs, const State & rhs)
|
|
|
|
|
{
|
|
|
|
|
return std::tie(lhs.priority, lhs.error_count, lhs.random)
|
|
|
|
|
< std::tie(rhs.priority, rhs.error_count, rhs.random);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Int64 priority = 0;
|
|
|
|
|
UInt64 error_count = 0;
|
|
|
|
|
UInt32 random = 0;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
public:
|
|
|
|
|
NestedPoolPtr pool;
|
|
|
|
|
State state;
|
|
|
|
|
drand48_data rand_state;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
using States = std::vector<typename PoolWithErrorCount::State>;
|
|
|
|
|
|
|
|
|
|
class PoolsWithErrorCount : public std::vector<PoolWithErrorCount>
|
|
|
|
|
{
|
|
|
|
|
public:
|
|
|
|
|
PoolsWithErrorCount(typename NestedPools::iterator begin_, typename NestedPools::iterator end_,
|
|
|
|
|
time_t decrease_error_period_)
|
|
|
|
|
: std::vector<PoolWithErrorCount>(begin_, end_),
|
|
|
|
|
decrease_error_period(decrease_error_period_)
|
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Эта функция возвращает собственную копию состояния каждого пула, чтобы не возникло
|
|
|
|
|
/// состояния гонки при выделении соединений.
|
|
|
|
|
States update()
|
|
|
|
|
{
|
|
|
|
|
States states;
|
|
|
|
|
states.reserve(this->size());
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
|
|
|
|
|
|
|
|
|
|
for (auto & pool : *this)
|
|
|
|
|
pool.randomize();
|
|
|
|
|
|
|
|
|
|
/// Каждые N секунд уменьшаем количество ошибок в 2 раза
|
|
|
|
|
time_t current_time = time(0);
|
|
|
|
|
|
|
|
|
|
if (last_decrease_time)
|
|
|
|
|
{
|
|
|
|
|
time_t delta = current_time - last_decrease_time;
|
|
|
|
|
|
|
|
|
|
if (delta >= 0)
|
|
|
|
|
{
|
|
|
|
|
/// Каждые decrease_error_period секунд, делим количество ошибок на два.
|
|
|
|
|
size_t shift_amount = delta / decrease_error_period;
|
|
|
|
|
/// обновляем время, не чаще раз в период
|
|
|
|
|
/// в противном случае при частых вызовах счетчик никогда не будет уменьшаться
|
|
|
|
|
if (shift_amount)
|
|
|
|
|
last_decrease_time = current_time;
|
|
|
|
|
|
|
|
|
|
if (shift_amount >= sizeof(UInt64))
|
|
|
|
|
{
|
|
|
|
|
for (auto & pool : *this)
|
|
|
|
|
pool.state.error_count = 0;
|
|
|
|
|
}
|
|
|
|
|
else if (shift_amount)
|
|
|
|
|
{
|
|
|
|
|
for (auto & pool : *this)
|
|
|
|
|
pool.state.error_count >>= shift_amount;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
last_decrease_time = current_time;
|
|
|
|
|
|
|
|
|
|
for (auto & pool : *this)
|
|
|
|
|
states.push_back(pool.state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return states;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
/// Время, когда последний раз уменьшался счётчик ошибок.
|
|
|
|
|
time_t last_decrease_time = 0;
|
|
|
|
|
time_t decrease_error_period;
|
|
|
|
|
Poco::FastMutex mutex;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
PoolsWithErrorCount nested_pools;
|
|
|
|
|
size_t max_tries;
|
|
|
|
|
Logger * log;
|
|
|
|
|
|
|
|
|
|
virtual bool tryGet(NestedPoolPtr pool, const DB::Settings * settings, Entry & out_entry, std::stringstream & fail_message) = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
/** Выделяет соединение из одной реплики для работы. */
|
|
|
|
|
bool getResource(Entry & entry, std::stringstream & fail_messages, ResourceTracker * resource_tracker, const DB::Settings * settings)
|
|
|
|
|
{
|
|
|
|
|
/// Обновление случайных чисел, а также счётчиков ошибок.
|
|
|
|
|
States states = nested_pools.update();
|
|
|
|
|
|
|
|
|
|
struct IndexedPoolWithErrorCount
|
|
|
|
|
{
|
|
|
|
|
PoolWithErrorCount * pool;
|
|
|
|
|
typename PoolWithErrorCount::State * state;
|
|
|
|
|
size_t index;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
using PoolPtrs = std::vector<IndexedPoolWithErrorCount>;
|
|
|
|
|
|
|
|
|
|
size_t pools_size = resource_tracker ? resource_tracker->getUnallocatedSize() : nested_pools.size();
|
|
|
|
|
PoolPtrs pool_ptrs(pools_size);
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < pools_size; ++i)
|
|
|
|
|
{
|
|
|
|
|
auto & record = pool_ptrs[i];
|
|
|
|
|
size_t pool_index = resource_tracker ? resource_tracker->getHandle(i) : i;
|
|
|
|
|
record.pool = &nested_pools[pool_index];
|
|
|
|
|
record.state = &states[pool_index];
|
|
|
|
|
record.index = i;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::sort(pool_ptrs.begin(), pool_ptrs.end(),
|
|
|
|
|
[](const IndexedPoolWithErrorCount & lhs, const IndexedPoolWithErrorCount & rhs)
|
|
|
|
|
{
|
|
|
|
|
return PoolWithErrorCount::State::compare(*(lhs.state), *(rhs.state));
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
for (size_t try_no = 0; try_no < max_tries; ++try_no)
|
|
|
|
|
{
|
|
|
|
|
for (size_t i = 0; i < pools_size; ++i)
|
|
|
|
|
{
|
|
|
|
|
std::stringstream fail_message;
|
|
|
|
|
|
|
|
|
|
if (tryGet(pool_ptrs[i].pool->pool, settings, entry, fail_message))
|
|
|
|
|
{
|
|
|
|
|
if (resource_tracker)
|
|
|
|
|
resource_tracker->markAsAllocated(pool_ptrs[i].index);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
|
|
|
|
|
|
|
|
|
|
LOG_WARNING(log, "Connection failed at try №"
|
|
|
|
|
<< (try_no + 1) << ", reason: " << fail_message.str());
|
|
|
|
|
|
|
|
|
|
fail_messages << fail_message.str() << std::endl;
|
|
|
|
|
|
|
|
|
|
__sync_fetch_and_add(&pool_ptrs[i].pool->state.error_count, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
};
|