dbms: added Dictionaries [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-12-19 20:15:15 +00:00
parent 28066c594c
commit 59e197bc88
5 changed files with 128 additions and 12 deletions

View File

@ -16,6 +16,7 @@
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Storages/StorageFactory.h>
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Dictionaries.h>
namespace DB
@ -42,6 +43,7 @@ struct ContextShared
DataTypeFactory data_type_factory; /// Типы данных.
StorageFactory storage_factory; /// Движки таблиц.
FormatFactory format_factory; /// Форматы.
SharedPtr<Dictionaries> dictionaries; /// Словари Метрики.
Logger * log; /// Логгер.
mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов.
@ -104,6 +106,7 @@ public:
const DataTypeFactory & getDataTypeFactory() const { return shared->data_type_factory; }
const StorageFactory & getStorageFactory() const { return shared->storage_factory; }
const FormatFactory & getFormatFactory() const { return shared->format_factory; }
const Dictionaries & getDictionaries();
/// Получить запрос на CREATE таблицы.
ASTPtr getCreateQuery(const String & database_name, const String & table_name) const;

View File

@ -0,0 +1,105 @@
#pragma once
#include <boost/thread.hpp>
#include <Poco/SharedPtr.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <statdaemons/RegionsHierarchy.h>
#include <statdaemons/TechDataHierarchy.h>
namespace DB
{
using Poco::SharedPtr;
/// Словари Метрики, которые могут использоваться в функциях.
class Dictionaries
{
private:
Yandex::MultiVersion<RegionsHierarchy> regions_hierarchy;
Yandex::MultiVersion<TechDataHierarchy> tech_data_hierarchy;
/// Периодичность обновления справочников, в секундах.
int reload_period;
boost::thread reloading_thread;
Poco::Event destroy;
Logger * log;
/// Обновляет справочники.
void reloadImpl()
{
/** Если не удаётся обновить справочники, то несмотря на это, не кидаем исключение (используем старые справочники).
* Если старых корректных справочников нет, то при использовании функций, которые от них зависят,
* будет кидаться исключение.
*/
try
{
LOG_INFO(log, "Loading dictionaries.");
Yandex::MultiVersion<TechDataHierarchy>::Version new_tech_data_hierarchy = new TechDataHierarchy;
Yandex::MultiVersion<RegionsHierarchy>::Version new_regions_hierarchy = new RegionsHierarchy;
new_regions_hierarchy->reload();
tech_data_hierarchy.set(new_tech_data_hierarchy);
regions_hierarchy.set(new_regions_hierarchy);
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Cannot load dictionaries! You must resolve this manually. " << e.displayText());
return;
}
catch (...)
{
LOG_ERROR(log, "Cannot load dictionaries! You must resolve this manually.");
return;
}
LOG_INFO(log, "Loaded dictionaries.");
}
/// Обновляет каждые reload_period секунд.
void reloadPeriodically()
{
while (true)
{
reloadImpl();
if (destroy.tryWait(reload_period * 1000))
return;
}
}
public:
/// Справочники будут обновляться в отдельном потоке, каждые reload_period секунд.
Dictionaries(int reload_period_ = 3600)
: reload_period(reload_period_),
reloading_thread(&Dictionaries::reloadPeriodically, this),
log(&Logger::get("Dictionaries"))
{
}
~Dictionaries()
{
destroy.set();
reloading_thread.join();
}
Yandex::MultiVersion<RegionsHierarchy>::Version getRegionsHierarchy() const
{
return regions_hierarchy.get();
}
Yandex::MultiVersion<TechDataHierarchy>::Version getTechDataHierarchy() const
{
return tech_data_hierarchy.get();
}
};
}

View File

@ -257,4 +257,15 @@ Context & Context::getGlobalContext()
return *global_context;
}
const Dictionaries & Context::getDictionaries()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries;
return *shared->dictionaries;
}
}

View File

@ -185,7 +185,7 @@ void TCPHandler::processOrdinaryQuery()
while (true)
{
if (async_in.poll(state.context.getSettingsRef().interactive_delay / 1000))
if (async_in.poll(connection_context.getSettingsRef().interactive_delay / 1000))
{
block = async_in.read();
break;
@ -321,8 +321,7 @@ void TCPHandler::receiveQuery()
readStringBinary(state.query, *in);
state.context = connection_context;
state.io = executeQuery(state.query, state.context, state.stage);
state.io = executeQuery(state.query, connection_context, state.stage);
}
@ -335,12 +334,12 @@ bool TCPHandler::receiveData()
else
state.maybe_compressed_in = in;
state.block_in = state.context.getFormatFactory().getInput(
state.block_in = connection_context.getFormatFactory().getInput(
"Native",
*state.maybe_compressed_in,
state.io.out_sample,
state.context.getSettingsRef().max_block_size,
state.context.getDataTypeFactory());
connection_context.getSettingsRef().max_block_size,
connection_context.getDataTypeFactory());
}
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
@ -360,7 +359,7 @@ bool TCPHandler::isQueryCancelled()
if (state.is_cancelled || state.sent_all_data)
return true;
if (after_check_cancelled.elapsed() / 1000 < state.context.getSettingsRef().interactive_delay)
if (after_check_cancelled.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay)
return false;
after_check_cancelled.restart();
@ -400,7 +399,7 @@ void TCPHandler::sendData(Block & block)
else
state.maybe_compressed_out = out;
state.block_out = state.context.getFormatFactory().getOutput(
state.block_out = connection_context.getFormatFactory().getOutput(
"Native",
*state.maybe_compressed_out,
state.io.in_sample);
@ -445,7 +444,7 @@ void TCPHandler::sendProgress(size_t rows, size_t bytes)
if (state.sent_all_data)
return;
if (after_send_progress.elapsed() / 1000 < state.context.getSettingsRef().interactive_delay)
if (after_send_progress.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay)
return;
after_send_progress.restart();

View File

@ -41,8 +41,6 @@ struct QueryState
/// Потоки блоков, с помощью которых выполнять запрос.
BlockIO io;
Context context;
bool is_cancelled;
/// Данные были отправлены.
bool sent_all_data;
@ -72,7 +70,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection
public:
TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_), server(server_)
, log(&Logger::get("TCPHandler"))
, log(&Logger::get("TCPHandler")), connection_context(server.global_context)
{
LOG_TRACE(log, "In constructor.");
}