Merge pull request #113 from yandex/metrika-sync-3

Metrika sync 3
This commit is contained in:
alexey-milovidov 2016-09-22 00:55:43 +04:00 committed by GitHub
commit f252110afd
5 changed files with 56 additions and 9 deletions

View File

@ -6,6 +6,7 @@
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
#include <DB/Common/Exception.h>
/// This type specifies the possible behaviors of an object pool allocator.
enum class PoolMode
@ -36,14 +37,14 @@ private:
/** Объект с флагом, используется ли он сейчас. */
struct PooledObject
{
PooledObject(std::condition_variable & available_, ObjectPtr object_)
: object(object_), available(available_)
PooledObject(ObjectPtr object_, PoolBase & pool_)
: object(object_), pool(pool_)
{
}
ObjectPtr object;
bool in_use = false;
std::condition_variable & available;
PoolBase & pool;
};
using Objects = std::vector<std::shared_ptr<PooledObject>>;
@ -54,7 +55,12 @@ private:
struct PoolEntryHelper
{
PoolEntryHelper(PooledObject & data_) : data(data_) { data.in_use = true; }
~PoolEntryHelper() { data.in_use = false; data.available.notify_one(); }
~PoolEntryHelper()
{
std::unique_lock<std::mutex> lock(data.pool.mutex);
data.in_use = false;
data.pool.available.notify_one();
}
PooledObject & data;
};
@ -86,6 +92,13 @@ public:
bool isNull() const { return data == nullptr; }
PoolBase * getPool() const
{
if (!data)
throw DB::Exception("attempt to get pool from uninitialized entry");
return &data->data.pool;
}
private:
std::shared_ptr<PoolEntryHelper> data;
@ -108,7 +121,7 @@ public:
if (items.size() < max_items)
{
ObjectPtr object = allocObject();
items.emplace_back(std::make_shared<PooledObject>(available, object));
items.emplace_back(std::make_shared<PooledObject>(object, *this));
return Entry(*items.back());
}
@ -126,7 +139,7 @@ public:
std::lock_guard<std::mutex> lock(mutex);
while (items.size() < count)
items.emplace_back(std::make_shared<PooledObject>(available, allocObject()));
items.emplace_back(std::make_shared<PooledObject>(allocObject(), *this));
}
private:

View File

@ -110,6 +110,19 @@ public:
return {};
}
void reportError(const Entry & entry)
{
for (auto & pool : nested_pools)
{
if (pool.pool->contains(entry))
{
++pool.state.error_count;
return;
}
}
throw DB::Exception("Can't find pool to report error.");
}
/** Выделяет до указанного количества соединений для работы
* Соединения предоставляют доступ к разным репликам одного шарда.
*/

View File

@ -2,7 +2,6 @@
#include <DB/Parsers/IAST.h>
#include <DB/Storages/IStorage.h>
#include <DB/Client/ConnectionPool.h>
namespace DB
{

View File

@ -37,8 +37,6 @@
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Client/ConnectionPoolWithFailover.h>
#include <DB/Databases/IDatabase.h>
#include <DB/Common/ConfigProcessor.h>

View File

@ -91,6 +91,8 @@ public:
return dynamic_cast<BaseDaemon &>(Poco::Util::Application::instance());
}
static boost::optional<BaseDaemon &> tryGetInstance() { return tryGetInstance<BaseDaemon>(); }
/// Спит заданное количество секунд или до события wakeup
void sleep(double seconds);
@ -142,6 +144,9 @@ protected:
/// thread safe
virtual void onInterruptSignals(int signal_id);
template <class Daemon>
static boost::optional<Daemon &> tryGetInstance();
std::unique_ptr<Poco::TaskManager> task_manager;
/// Создание и автоматическое удаление pid файла.
@ -194,3 +199,22 @@ protected:
std::atomic_size_t terminate_signals_counter{0};
std::atomic_size_t sigint_signals_counter{0};
};
template <class Daemon>
boost::optional<Daemon &> BaseDaemon::tryGetInstance()
{
Daemon * ptr = nullptr;
try
{
ptr = dynamic_cast<Daemon *>(&Poco::Util::Application::instance());
}
catch (const Poco::NullPointerException &)
{
}
if (ptr)
return boost::optional<Daemon &>(*ptr);
else
return boost::none;
}