ClickHouse/dbms/src/Interpreters/Context.cpp

880 lines
27 KiB
C++
Raw Normal View History

2015-04-16 06:12:35 +00:00
#include <map>
#include <set>
2015-05-07 10:31:50 +00:00
#include <chrono>
2015-04-16 06:12:35 +00:00
#include <Poco/SharedPtr.h>
#include <Poco/Mutex.h>
2012-08-17 19:53:11 +00:00
#include <Poco/File.h>
2015-04-16 06:12:35 +00:00
#include <Yandex/logger_useful.h>
#include <DB/Common/Macros.h>
2012-08-17 19:53:11 +00:00
#include <DB/Common/escapeForFileName.h>
2015-04-16 06:12:35 +00:00
#include <DB/DataStreams/FormatFactory.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
#include <DB/TableFunctions/TableFunctionFactory.h>
#include <DB/Storages/IStorage.h>
#include <DB/Storages/MarkCache.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Storages/MergeTree/MergeList.h>
#include <DB/Storages/CompressionMethodSelector.h>
2015-04-16 06:12:35 +00:00
#include <DB/Interpreters/Settings.h>
#include <DB/Interpreters/Users.h>
#include <DB/Interpreters/Quota.h>
#include <DB/Interpreters/Dictionaries.h>
#include <DB/Interpreters/ExternalDictionaries.h>
#include <DB/Interpreters/ProcessList.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Interpreters/InterserverIOHandler.h>
#include <DB/Interpreters/Compiler.h>
#include <DB/Interpreters/QueryLog.h>
2015-04-16 06:12:35 +00:00
#include <DB/Interpreters/Context.h>
2012-08-17 19:53:11 +00:00
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/copyData.h>
2015-04-16 06:12:35 +00:00
#include <DB/IO/UncompressedCache.h>
2012-08-17 19:53:11 +00:00
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/parseQuery.h>
2015-04-16 06:12:35 +00:00
#include <DB/Client/ConnectionPool.h>
2013-12-07 16:51:29 +00:00
#include <DB/Client/ConnectionPoolWithFailover.h>
2012-08-02 17:33:31 +00:00
2015-04-16 06:12:35 +00:00
#include <statdaemons/ConfigProcessor.h>
#include <zkutil/ZooKeeper.h>
2012-08-02 17:33:31 +00:00
namespace DB
{
2015-04-16 06:12:35 +00:00
class TableFunctionFactory;
using Poco::SharedPtr;
/** Набор известных объектов, которые могут быть использованы в запросе.
* Разделяемая часть. Порядок членов (порядок их уничтожения) очень важен.
*/
struct ContextShared
{
Logger * log = &Logger::get("Context"); /// Логгер.
mutable Poco::Mutex mutex; /// Для доступа и модификации разделяемых объектов.
mutable Poco::Mutex external_dictionaries_mutex; /// Для доступа к внешним словарям. Отдельный мьютекс, чтобы избежать локов при обращении сервера к самому себе.
2015-04-16 06:12:35 +00:00
mutable zkutil::ZooKeeperPtr zookeeper; /// Клиент для ZooKeeper.
String interserver_io_host; /// Имя хоста по которым это сервер доступен для других серверов.
int interserver_io_port; /// и порт,
String path; /// Путь к директории с данными, со слешем на конце.
String tmp_path; /// Путь ко временным файлам, возникающим при обработке запроса.
Databases databases; /// Список БД и таблиц в них.
TableFunctionFactory table_function_factory; /// Табличные функции.
AggregateFunctionFactory aggregate_function_factory; /// Агрегатные функции.
FormatFactory format_factory; /// Форматы.
mutable SharedPtr<Dictionaries> dictionaries; /// Словари Метрики. Инициализируются лениво.
mutable SharedPtr<ExternalDictionaries> external_dictionaries;
Users users; /// Известные пользователи.
Quotas quotas; /// Известные квоты на использование ресурсов.
mutable UncompressedCachePtr uncompressed_cache; /// Кэш разжатых блоков.
mutable MarkCachePtr mark_cache; /// Кэш засечек в сжатых файлах.
ProcessList process_list; /// Исполняющиеся в данный момент запросы.
MergeList merge_list; /// Список выполняемых мерджей (для (Replicated)?MergeTree)
ViewDependencies view_dependencies; /// Текущие зависимости
ConfigurationPtr users_config; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler; /// Обработчик для межсерверной передачи данных.
BackgroundProcessingPoolPtr background_pool; /// Пул потоков для фоновой работы, выполняемой таблицами.
Macros macros; /// Подстановки из конфига.
std::unique_ptr<Compiler> compiler; /// Для динамической компиляции частей запроса, при необходимости.
std::unique_ptr<QueryLog> query_log; /// Для логгирования запросов.
mutable std::unique_ptr<CompressionMethodSelector> compression_method_selector; /// Правила для выбора метода сжатия в зависимости от размера куска.
2015-04-16 06:12:35 +00:00
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
Poco::SharedPtr<Clusters> clusters;
bool shutdown_called = false;
~ContextShared()
{
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/** Выполнить сложную работу по уничтожению объектов заранее.
*/
void shutdown()
{
if (shutdown_called)
return;
shutdown_called = true;
/** В этот момент, некоторые таблицы могут иметь потоки,
* которые модифицируют список таблиц, и блокируют наш mutex (см. StorageChunkMerger).
* Чтобы корректно их завершить, скопируем текущий список таблиц,
* и попросим их всех закончить свою работу.
* Потом удалим все объекты с таблицами.
*/
Databases current_databases;
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
current_databases = databases;
}
for (Databases::iterator it = current_databases.begin(); it != current_databases.end(); ++it)
for (Tables::iterator jt = it->second.begin(); jt != it->second.end(); ++jt)
jt->second->shutdown();
{
Poco::ScopedLock<Poco::Mutex> lock(mutex);
databases.clear();
}
}
};
Context::Context()
: shared(new ContextShared),
quota(new QuotaForIntervals)
{
}
Context::~Context() = default;
const TableFunctionFactory & Context::getTableFunctionFactory() const { return shared->table_function_factory; }
const AggregateFunctionFactory & Context::getAggregateFunctionFactory() const { return shared->aggregate_function_factory; }
const FormatFactory & Context::getFormatFactory() const { return shared->format_factory; }
InterserverIOHandler & Context::getInterserverIOHandler() { return shared->interserver_io_handler; }
Poco::Mutex & Context::getMutex() const { return shared->mutex; }
const Databases & Context::getDatabases() const { return shared->databases; }
Databases & Context::getDatabases() { return shared->databases; }
ProcessList & Context::getProcessList() { return shared->process_list; }
const ProcessList & Context::getProcessList() const { return shared->process_list; }
MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
2012-08-02 17:33:31 +00:00
String Context::getPath() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->path;
}
String Context::getTemporaryPath() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->tmp_path;
}
2012-08-02 17:33:31 +00:00
void Context::setPath(const String & path)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->path = path;
}
void Context::setTemporaryPath(const String & path)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->tmp_path = path;
}
2012-08-02 17:33:31 +00:00
void Context::setUsersConfig(ConfigurationPtr config)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->users_config = config;
shared->users.loadFromConfig(*shared->users_config);
shared->quotas.loadFromConfig(*shared->users_config);
}
ConfigurationPtr Context::getUsersConfig()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->users_config;
}
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
const User & user_props = shared->users.get(name, password, address);
setSetting("profile", user_props.profile);
setQuota(user_props.quota, quota_key, name, address);
user = name;
ip_address = address;
}
void Context::setQuota(const String & name, const String & quota_key, const String & user_name, const Poco::Net::IPAddress & address)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
quota = shared->quotas.get(name, quota_key, user_name, address);
}
QuotaForIntervals & Context::getQuota()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return *quota;
}
void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->view_dependencies[from].insert(where);
}
void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
shared->view_dependencies[from].erase(where);
}
Dependencies Context::getDependencies(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name));
if (iter == shared->view_dependencies.end())
return {};
return Dependencies(iter->second.begin(), iter->second.end());
}
2012-08-02 17:33:31 +00:00
bool Context::isTableExist(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it;
return shared->databases.end() != (it = shared->databases.find(db))
&& it->second.end() != it->second.find(table_name);
}
bool Context::isDatabaseExist(const String & database_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
return shared->databases.end() != shared->databases.find(db);
}
void Context::assertTableExists(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
2012-08-02 17:33:31 +00:00
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if (it->second.end() == it->second.find(table_name))
throw Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
void Context::assertTableDoesntExist(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it;
if (shared->databases.end() != (it = shared->databases.find(db))
&& it->second.end() != it->second.find(table_name))
throw Exception("Table " + db + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
void Context::assertDatabaseExists(const String & database_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
if (shared->databases.end() == shared->databases.find(db))
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
}
void Context::assertDatabaseDoesntExist(const String & database_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
if (shared->databases.end() != shared->databases.find(db))
throw Exception("Database " + db + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
Tables Context::getExternalTables() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
Tables res = external_tables;
if (session_context && session_context != this)
{
Tables buf = session_context->getExternalTables();
res.insert(buf.begin(), buf.end());
}
else if (global_context && global_context != this)
{
Tables buf = global_context->getExternalTables();
res.insert(buf.begin(), buf.end());
}
return res;
}
StoragePtr Context::tryGetExternalTable(const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
Tables::const_iterator jt = external_tables.find(table_name);
if (external_tables.end() == jt)
return StoragePtr();
return jt->second;
}
2012-08-02 17:33:31 +00:00
StoragePtr Context::getTable(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (database_name.empty())
{
StoragePtr res;
if ((res = tryGetExternalTable(table_name)))
return res;
if (session_context && (res = session_context->tryGetExternalTable(table_name)))
return res;
if (global_context && (res = global_context->tryGetExternalTable(table_name)))
return res;
}
2012-08-02 17:33:31 +00:00
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
2012-08-02 17:33:31 +00:00
throw Exception("Database " + db + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
2012-08-02 17:33:31 +00:00
throw Exception("Table " + db + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return jt->second;
}
StoragePtr Context::tryGetTable(const String & database_name, const String & table_name) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (database_name.empty())
{
StoragePtr res;
if ((res = tryGetExternalTable(table_name)))
return res;
if (session_context && (res = session_context->tryGetExternalTable(table_name)))
return res;
if (global_context && (res = global_context->tryGetExternalTable(table_name)))
return res;
}
String db = database_name.empty() ? current_database : database_name;
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
return StoragePtr();
Tables::const_iterator jt = it->second.find(table_name);
if (it->second.end() == jt)
return StoragePtr();
return jt->second;
}
2012-08-02 17:33:31 +00:00
void Context::addExternalTable(const String & table_name, StoragePtr storage)
{
if (external_tables.end() != external_tables.find(table_name))
throw Exception("Temporary table " + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
external_tables[table_name] = storage;
}
2012-08-02 17:33:31 +00:00
void Context::addTable(const String & database_name, const String & table_name, StoragePtr table)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
2012-08-02 17:33:31 +00:00
String db = database_name.empty() ? current_database : database_name;
assertDatabaseExists(db);
assertTableDoesntExist(db, table_name);
shared->databases[db][table_name] = table;
}
void Context::addDatabase(const String & database_name)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
assertDatabaseDoesntExist(db);
shared->databases[db];
}
2013-09-30 01:29:19 +00:00
StoragePtr Context::detachTable(const String & database_name, const String & table_name)
2012-08-02 17:33:31 +00:00
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
assertTableExists(db, table_name);
2013-09-30 01:29:19 +00:00
Tables::iterator it = shared->databases[db].find(table_name);
StoragePtr res = it->second;
shared->databases[db].erase(it);
return res;
2012-08-02 17:33:31 +00:00
}
void Context::detachDatabase(const String & database_name)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
String db = database_name.empty() ? current_database : database_name;
assertDatabaseExists(db);
shared->databases.erase(db);
2012-08-02 17:33:31 +00:00
}
2012-08-17 19:53:11 +00:00
ASTPtr Context::getCreateQuery(const String & database_name, const String & table_name) const
{
StoragePtr table;
String db;
2012-08-17 19:53:11 +00:00
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
db = database_name.empty() ? current_database : database_name;
table = getTable(db, table_name);
}
2012-08-17 19:53:11 +00:00
auto table_lock = table->lockStructure(false);
2012-08-17 19:53:11 +00:00
/// Здесь хранится определение таблицы
String metadata_path = shared->path + "metadata/" + escapeForFileName(db) + "/" + escapeForFileName(table_name) + ".sql";
2012-08-17 19:53:11 +00:00
if (!Poco::File(metadata_path).exists())
{
try
{
/// Если файл .sql не предусмотрен (например, для таблиц типа ChunkRef), то движок может сам предоставить запрос CREATE.
return table->getCustomCreateQuery(*this);
}
catch (...)
{
throw Exception("Metadata file " + metadata_path + " for table " + db + "." + table_name + " doesn't exist.",
ErrorCodes::TABLE_METADATA_DOESNT_EXIST);
}
}
2012-08-17 19:53:11 +00:00
2013-02-11 11:22:15 +00:00
StringPtr query = new String();
2012-08-17 19:53:11 +00:00
{
ReadBufferFromFile in(metadata_path);
2013-02-11 11:22:15 +00:00
WriteBufferFromString out(*query);
2012-08-17 19:53:11 +00:00
copyData(in, out);
}
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, query->data(), query->data() + query->size(), "in file " + metadata_path);
2012-08-17 19:53:11 +00:00
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
2012-08-17 19:53:11 +00:00
ast_create_query.attach = false;
ast_create_query.database = db;
2013-02-11 11:22:15 +00:00
ast_create_query.query_string = query;
2012-08-17 19:53:11 +00:00
return ast;
}
2012-08-02 17:33:31 +00:00
Settings Context::getSettings() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return settings;
}
Limits Context::getLimits() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return settings.limits;
}
2012-08-02 17:33:31 +00:00
void Context::setSettings(const Settings & settings_)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
settings = settings_;
}
2012-08-02 19:03:32 +00:00
void Context::setSetting(const String & name, const Field & value)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (name == "profile")
settings.setProfile(value.safeGet<String>(), *shared->users_config);
else
settings.set(name, value);
}
void Context::setSetting(const String & name, const std::string & value)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (name == "profile")
settings.setProfile(value, *shared->users_config);
else
settings.set(name, value);
2012-08-02 19:03:32 +00:00
}
2012-08-02 17:33:31 +00:00
String Context::getCurrentDatabase() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return current_database;
}
String Context::getCurrentQueryId() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return current_query_id;
}
2012-08-02 17:33:31 +00:00
void Context::setCurrentDatabase(const String & name)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
assertDatabaseExists(name);
current_database = name;
}
void Context::setCurrentQueryId(const String & query_id)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
current_query_id = query_id;
}
String Context::getDefaultFormat() const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return default_format.empty() ? "TabSeparated" : default_format;
}
void Context::setDefaultFormat(const String & name)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
default_format = name;
}
2014-08-11 15:59:01 +00:00
const Macros& Context::getMacros() const
{
return shared->macros;
}
void Context::setMacros(Macros && macros)
{
/// Полагаемся, что это присваивание происходит один раз при старте сервера. Если это не так, нужно использовать мьютекс.
shared->macros = macros;
}
2012-08-02 17:33:31 +00:00
Context & Context::getSessionContext()
{
if (!session_context)
throw Exception("There is no session", ErrorCodes::THERE_IS_NO_SESSION);
return *session_context;
}
Context & Context::getGlobalContext()
{
if (!global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
return *global_context;
}
2012-12-19 20:15:15 +00:00
const Dictionaries & Context::getDictionaries() const
2012-12-19 20:15:15 +00:00
{
return getDictionariesImpl(false);
}
const ExternalDictionaries & Context::getExternalDictionaries() const
{
return getExternalDictionariesImpl(false);
}
const Dictionaries & Context::getDictionariesImpl(const bool throw_on_error) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->dictionaries)
shared->dictionaries = new Dictionaries{throw_on_error};
return *shared->dictionaries;
}
const ExternalDictionaries & Context::getExternalDictionariesImpl(const bool throw_on_error) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->external_dictionaries_mutex);
if (!shared->external_dictionaries)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
shared->external_dictionaries = new ExternalDictionaries{*this->global_context, throw_on_error};
}
return *shared->external_dictionaries;
}
void Context::tryCreateDictionaries() const
{
static_cast<void>(getDictionariesImpl(true));
}
void Context::tryCreateExternalDictionaries() const
{
static_cast<void>(getExternalDictionariesImpl(true));
2012-12-19 20:15:15 +00:00
}
void Context::setProgressCallback(ProgressCallback callback)
{
/// Колбек устанавливается на сессию или на запрос. В сессии одновременно обрабатывается только один запрос. Поэтому блокировка не нужна.
progress_callback = callback;
}
ProgressCallback Context::getProgressCallback() const
{
return progress_callback;
}
void Context::setProcessListElement(ProcessList::Element * elem)
{
/// Устанавливается на сессию или на запрос. В сессии одновременно обрабатывается только один запрос. Поэтому блокировка не нужна.
process_list_elem = elem;
}
ProcessList::Element * Context::getProcessListElement()
{
return process_list_elem;
}
2014-03-28 14:36:24 +00:00
void Context::setUncompressedCache(size_t max_size_in_bytes)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->uncompressed_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
2015-04-16 06:12:35 +00:00
shared->uncompressed_cache.reset(new UncompressedCache(max_size_in_bytes));
}
UncompressedCachePtr Context::getUncompressedCache() const
{
2015-04-16 06:12:35 +00:00
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
return shared->uncompressed_cache;
}
2014-02-11 13:30:42 +00:00
void Context::setMarkCache(size_t cache_size_in_bytes)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->mark_cache)
throw Exception("Uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
2015-05-07 10:31:50 +00:00
shared->mark_cache.reset(new MarkCache(cache_size_in_bytes, std::chrono::seconds(settings.mark_cache_min_lifetime)));
2014-02-11 13:30:42 +00:00
}
MarkCachePtr Context::getMarkCache() const
{
2015-04-16 06:12:35 +00:00
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
2014-02-11 13:30:42 +00:00
return shared->mark_cache;
}
2014-07-02 12:30:38 +00:00
BackgroundProcessingPool & Context::getBackgroundPool()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->background_pool)
shared->background_pool = new BackgroundProcessingPool(settings.background_pool_size);
return *shared->background_pool;
}
void Context::resetCaches() const
{
2015-04-16 06:12:35 +00:00
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->uncompressed_cache)
shared->uncompressed_cache->reset();
if (shared->mark_cache)
shared->mark_cache->reset();
}
2014-05-13 10:10:26 +00:00
void Context::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
2014-03-21 19:17:59 +00:00
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (shared->zookeeper)
throw Exception("ZooKeeper client has already been set.", ErrorCodes::LOGICAL_ERROR);
shared->zookeeper = zookeeper;
}
2014-05-13 10:10:26 +00:00
zkutil::ZooKeeperPtr Context::getZooKeeper() const
2014-03-21 19:17:59 +00:00
{
2014-04-25 13:55:15 +00:00
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
2014-05-13 11:24:04 +00:00
if (shared->zookeeper && shared->zookeeper->expired())
2014-04-25 13:55:15 +00:00
shared->zookeeper = shared->zookeeper->startNewSession();
return shared->zookeeper;
2014-03-21 19:17:59 +00:00
}
void Context::setInterserverIOAddress(const String & host, UInt16 port)
{
shared->interserver_io_host = host;
shared->interserver_io_port = port;
}
std::pair<String, UInt16> Context::getInterserverIOAddress() const
{
if (shared->interserver_io_host.empty() || shared->interserver_io_port == 0)
throw Exception("Parameter 'interserver_http_port' required for replication is not specified in configuration file.",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return { shared->interserver_io_host, shared->interserver_io_port };
}
void Context::initClusters()
2013-12-07 16:51:29 +00:00
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->clusters)
shared->clusters = new Clusters(settings);
}
2013-12-07 16:51:29 +00:00
Cluster & Context::getCluster(const std::string & cluster_name)
{
if (!shared->clusters)
throw Poco::Exception("Clusters have not been initialized yet.");
Clusters::Impl::iterator it = shared->clusters->impl.find(cluster_name);
if (it != shared->clusters->impl.end())
2013-12-07 16:51:29 +00:00
return it->second;
else
throw Poco::Exception("Failed to find cluster with name = " + cluster_name);
}
2015-04-30 12:43:16 +00:00
Poco::SharedPtr<Clusters> Context::getClusters() const
{
if (!shared->clusters)
throw Poco::Exception("Clusters have not been initialized yet.");
return shared->clusters;
}
Compiler & Context::getCompiler()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->compiler)
shared->compiler.reset(new Compiler{ shared->path + "build/", 1 });
return *shared->compiler;
}
QueryLog & Context::getQueryLog()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->query_log)
{
auto & config = Poco::Util::Application::instance().config();
String database = config.getString("query_log.database", "system");
String table = config.getString("query_log.table", "query_log");
size_t flush_interval_milliseconds = parse<size_t>(
config.getString("query_log.flush_interval_milliseconds", DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS_STR));
shared->query_log.reset(new QueryLog{ *this, database, table, flush_interval_milliseconds });
}
return *shared->query_log;
}
CompressionMethod Context::chooseCompressionMethod(size_t part_size, double part_size_ratio) const
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->compression_method_selector)
{
constexpr auto config_name = "compression";
auto & config = Poco::Util::Application::instance().config();
if (config.has(config_name))
shared->compression_method_selector.reset(new CompressionMethodSelector{config, "compression"});
else
shared->compression_method_selector.reset(new CompressionMethodSelector);
}
return shared->compression_method_selector->choose(part_size, part_size_ratio);
}
2015-04-16 06:12:35 +00:00
void Context::shutdown()
{
shared->shutdown();
}
2012-08-02 17:33:31 +00:00
}