2015-04-16 06:12:35 +00:00
# include <map>
# include <set>
2015-05-07 10:31:50 +00:00
# include <chrono>
2015-04-16 06:12:35 +00:00
# include <Poco/SharedPtr.h>
# include <Poco/Mutex.h>
2012-08-17 19:53:11 +00:00
# include <Poco/File.h>
2015-07-25 09:49:09 +00:00
# include <Poco/UUIDGenerator.h>
2012-08-17 19:53:11 +00:00
2015-04-16 06:12:35 +00:00
# include <Yandex/logger_useful.h>
# include <DB/Common/Macros.h>
2012-08-17 19:53:11 +00:00
# include <DB/Common/escapeForFileName.h>
2015-04-16 06:12:35 +00:00
# include <DB/DataStreams/FormatFactory.h>
# include <DB/AggregateFunctions/AggregateFunctionFactory.h>
# include <DB/TableFunctions/TableFunctionFactory.h>
# include <DB/Storages/IStorage.h>
# include <DB/Storages/MarkCache.h>
# include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
# include <DB/Storages/MergeTree/MergeList.h>
2015-07-16 21:32:51 +00:00
# include <DB/Storages/MergeTree/MergeTreeSettings.h>
2015-04-17 05:35:53 +00:00
# include <DB/Storages/CompressionMethodSelector.h>
2015-04-16 06:12:35 +00:00
# include <DB/Interpreters/Settings.h>
# include <DB/Interpreters/Users.h>
# include <DB/Interpreters/Quota.h>
# include <DB/Interpreters/Dictionaries.h>
# include <DB/Interpreters/ExternalDictionaries.h>
# include <DB/Interpreters/ProcessList.h>
# include <DB/Interpreters/Cluster.h>
# include <DB/Interpreters/InterserverIOHandler.h>
# include <DB/Interpreters/Compiler.h>
2015-06-26 20:48:10 +00:00
# include <DB/Interpreters/QueryLog.h>
2015-04-16 06:12:35 +00:00
# include <DB/Interpreters/Context.h>
2012-08-17 19:53:11 +00:00
# include <DB/IO/ReadBufferFromFile.h>
# include <DB/IO/WriteBufferFromString.h>
# include <DB/IO/copyData.h>
2015-04-16 06:12:35 +00:00
# include <DB/IO/UncompressedCache.h>
2012-08-17 19:53:11 +00:00
# include <DB/Parsers/ASTCreateQuery.h>
# include <DB/Parsers/ParserCreateQuery.h>
2015-04-11 03:10:23 +00:00
# include <DB/Parsers/parseQuery.h>
2015-04-16 06:12:35 +00:00
# include <DB/Client/ConnectionPool.h>
2013-12-07 16:51:29 +00:00
# include <DB/Client/ConnectionPoolWithFailover.h>
2012-08-02 17:33:31 +00:00
2015-04-16 06:12:35 +00:00
# include <statdaemons/ConfigProcessor.h>
# include <zkutil/ZooKeeper.h>
2014-08-22 01:01:28 +00:00
2012-08-02 17:33:31 +00:00
namespace DB
{
2015-04-16 06:12:35 +00:00
class TableFunctionFactory ;
using Poco : : SharedPtr ;
/** Н а б о р известных объектов, которые могут быть использованы в запросе.
* Р а з д е л я е м а я ч а с т ь . П о р я д о к ч л е н о в ( п о р я д о к и х у н и ч т о ж е н и я ) о ч е н ь в а ж е н .
*/
struct ContextShared
{
Logger * log = & Logger : : get ( " Context " ) ; /// Логгер.
mutable Poco : : Mutex mutex ; /// Для доступа и модификации разделяемых объектов.
2015-04-22 13:53:32 +00:00
mutable Poco : : Mutex external_dictionaries_mutex ; /// Для доступа к внешним словарям. Отдельный мьютекс, чтобы избежать локов при обращении сервера к самому с е б е .
2015-04-16 06:12:35 +00:00
mutable zkutil : : ZooKeeperPtr zookeeper ; /// Клиент для ZooKeeper.
String interserver_io_host ; /// Имя хоста по которым это сервер доступен для других серверов.
int interserver_io_port ; /// и порт,
String path ; /// Путь к директории с данными, с о слешем на конце.
String tmp_path ; /// Путь ко временным файлам, возникающим при обработке запроса.
Databases databases ; /// Список БД и таблиц в них.
TableFunctionFactory table_function_factory ; /// Табличные функции.
AggregateFunctionFactory aggregate_function_factory ; /// Агрегатные функции.
FormatFactory format_factory ; /// Форматы.
mutable SharedPtr < Dictionaries > dictionaries ; /// Словари Метрики. Инициализируются лениво.
mutable SharedPtr < ExternalDictionaries > external_dictionaries ;
Users users ; /// Известные пользователи.
Quotas quotas ; /// Известные квоты на использование ресурсов.
mutable UncompressedCachePtr uncompressed_cache ; /// Кэш разжатых блоков.
mutable MarkCachePtr mark_cache ; /// Кэш засечек в сжатых файлах.
ProcessList process_list ; /// Исполняющиеся в данный момент запросы.
MergeList merge_list ; /// Список выполняемых мерджей (для (Replicated)?MergeTree)
ViewDependencies view_dependencies ; /// Текущие зависимости
ConfigurationPtr users_config ; /// Конфиг с секциями users, profiles и quotas.
InterserverIOHandler interserver_io_handler ; /// Обработчик для межсерверной передачи данных.
BackgroundProcessingPoolPtr background_pool ; /// Пул потоков для фоновой работы, выполняемой таблицами.
Macros macros ; /// Подстановки из конфига.
std : : unique_ptr < Compiler > compiler ; /// Для динамической компиляции частей запроса, при необходимости.
2015-06-26 20:48:10 +00:00
std : : unique_ptr < QueryLog > query_log ; /// Для логгирования запросов.
2015-04-17 05:35:53 +00:00
mutable std : : unique_ptr < CompressionMethodSelector > compression_method_selector ; /// Правила для выбора метода сжатия в зависимости от размера куска.
2015-07-16 21:32:51 +00:00
std : : unique_ptr < MergeTreeSettings > merge_tree_settings ; /// Настройки для движка MergeTree.
2015-04-16 06:12:35 +00:00
2015-08-19 21:15:27 +00:00
/** Позволяет обращаться к временным таблицам конкретного запроса.
* И с п о л ь з у е т с я д л я р е а л и з а ц и и GLOBAL - п о д з а п р о с о в п о pull - с х е м е ,
* в к о т о р о й з а п р о с о т п р а в л я е т с я н а у д а л ё н н ы й с е р в е р , а у д а л ё н н ы й с е р в е р ,
* д л я п о л у ч е н и я д а н н ы х п о д з а п р о с а , о т п р а в л я е т з а п р о с з а в р е м е н н о й т а б л и ц е й н а и с х о д н ы й с е р в е р .
*/
std : : map < String , Tables > temporary_tables_by_query_id ;
2015-04-16 06:12:35 +00:00
/// Кластеры для distributed таблиц
/// Создаются при создании Distributed таблиц, так как нужно дождаться пока будут выставлены Settings
Poco : : SharedPtr < Clusters > clusters ;
2015-07-25 09:49:09 +00:00
Poco : : UUIDGenerator uuid_generator ;
2015-04-16 06:12:35 +00:00
bool shutdown_called = false ;
~ ContextShared ( )
{
try
{
shutdown ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
/** Выполнить сложную работу по уничтожению объектов заранее.
*/
void shutdown ( )
{
if ( shutdown_called )
return ;
shutdown_called = true ;
/** В этот момент, некоторые таблицы могут иметь потоки,
* к о т о р ы е м о д и ф и ц и р у ю т с п и с о к т а б л и ц , и б л о к и р у ю т н а ш mutex ( с м . StorageChunkMerger ) .
* Ч т о б ы к о р р е к т н о и х з а в е р ш и т ь , с к о п и р у е м т е к у щ и й с п и с о к т а б л и ц ,
* и п о п р о с и м и х в с е х з а к о н ч и т ь с в о ю р а б о т у .
* П о т о м у д а л и м в с е о б ъ е к т ы с т а б л и ц а м и .
*/
Databases current_databases ;
{
Poco : : ScopedLock < Poco : : Mutex > lock ( mutex ) ;
current_databases = databases ;
}
for ( Databases : : iterator it = current_databases . begin ( ) ; it ! = current_databases . end ( ) ; + + it )
for ( Tables : : iterator jt = it - > second . begin ( ) ; jt ! = it - > second . end ( ) ; + + jt )
jt - > second - > shutdown ( ) ;
{
Poco : : ScopedLock < Poco : : Mutex > lock ( mutex ) ;
databases . clear ( ) ;
}
}
} ;
Context : : Context ( )
: shared ( new ContextShared ) ,
quota ( new QuotaForIntervals )
{
}
2015-08-19 21:15:27 +00:00
Context : : ~ Context ( )
{
/// Удаляем запись из словаря query_id -> временные таблицы.
if ( ! current_query_id . empty ( ) & & ! external_tables . empty ( ) )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
auto it = shared - > temporary_tables_by_query_id . find ( current_query_id ) ;
if ( shared - > temporary_tables_by_query_id . end ( ) = = it )
{
LOG_ERROR ( shared - > log , " Logical error: cannot find entry for query_id in shared->temporary_tables_by_query_id map " ) ;
return ;
}
shared - > temporary_tables_by_query_id . erase ( it ) ;
}
}
2015-04-16 06:12:35 +00:00
const TableFunctionFactory & Context : : getTableFunctionFactory ( ) const { return shared - > table_function_factory ; }
const AggregateFunctionFactory & Context : : getAggregateFunctionFactory ( ) const { return shared - > aggregate_function_factory ; }
const FormatFactory & Context : : getFormatFactory ( ) const { return shared - > format_factory ; }
InterserverIOHandler & Context : : getInterserverIOHandler ( ) { return shared - > interserver_io_handler ; }
Poco : : Mutex & Context : : getMutex ( ) const { return shared - > mutex ; }
const Databases & Context : : getDatabases ( ) const { return shared - > databases ; }
Databases & Context : : getDatabases ( ) { return shared - > databases ; }
ProcessList & Context : : getProcessList ( ) { return shared - > process_list ; }
const ProcessList & Context : : getProcessList ( ) const { return shared - > process_list ; }
MergeList & Context : : getMergeList ( ) { return shared - > merge_list ; }
const MergeList & Context : : getMergeList ( ) const { return shared - > merge_list ; }
2012-08-02 17:33:31 +00:00
String Context : : getPath ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return shared - > path ;
}
2015-01-07 17:19:23 +00:00
String Context : : getTemporaryPath ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return shared - > tmp_path ;
}
2012-08-02 17:33:31 +00:00
void Context : : setPath ( const String & path )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
shared - > path = path ;
}
2015-01-07 17:19:23 +00:00
void Context : : setTemporaryPath ( const String & path )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
shared - > tmp_path = path ;
}
2012-08-02 17:33:31 +00:00
2014-02-13 07:17:22 +00:00
void Context : : setUsersConfig ( ConfigurationPtr config )
2013-08-10 07:46:45 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-02-13 07:17:22 +00:00
shared - > users_config = config ;
shared - > users . loadFromConfig ( * shared - > users_config ) ;
shared - > quotas . loadFromConfig ( * shared - > users_config ) ;
}
ConfigurationPtr Context : : getUsersConfig ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return shared - > users_config ;
2013-08-10 07:46:45 +00:00
}
2013-08-12 00:36:18 +00:00
void Context : : setUser ( const String & name , const String & password , const Poco : : Net : : IPAddress & address , const String & quota_key )
2013-08-10 07:46:45 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
const User & user_props = shared - > users . get ( name , password , address ) ;
setSetting ( " profile " , user_props . profile ) ;
2013-11-03 00:24:46 +00:00
setQuota ( user_props . quota , quota_key , name , address ) ;
2013-08-10 07:46:45 +00:00
user = name ;
2013-11-03 05:32:42 +00:00
ip_address = address ;
2013-08-10 07:46:45 +00:00
}
2013-11-03 00:24:46 +00:00
void Context : : setQuota ( const String & name , const String & quota_key , const String & user_name , const Poco : : Net : : IPAddress & address )
2013-08-12 00:36:18 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-02-13 07:17:22 +00:00
quota = shared - > quotas . get ( name , quota_key , user_name , address ) ;
2013-08-12 00:36:18 +00:00
}
QuotaForIntervals & Context : : getQuota ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return * quota ;
}
2013-11-13 14:39:48 +00:00
void Context : : addDependency ( const DatabaseAndTableName & from , const DatabaseAndTableName & where )
2013-11-08 17:43:03 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
shared - > view_dependencies [ from ] . insert ( where ) ;
}
2013-11-13 14:39:48 +00:00
void Context : : removeDependency ( const DatabaseAndTableName & from , const DatabaseAndTableName & where )
2013-11-08 17:43:03 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
shared - > view_dependencies [ from ] . erase ( where ) ;
}
2014-12-23 20:32:00 +00:00
Dependencies Context : : getDependencies ( const String & database_name , const String & table_name ) const
2013-11-08 17:43:03 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-12-23 20:32:00 +00:00
String db = database_name . empty ( ) ? current_database : database_name ;
ViewDependencies : : const_iterator iter = shared - > view_dependencies . find ( DatabaseAndTableName ( db , table_name ) ) ;
2013-11-08 17:43:03 +00:00
if ( iter = = shared - > view_dependencies . end ( ) )
2014-12-23 20:32:00 +00:00
return { } ;
return Dependencies ( iter - > second . begin ( ) , iter - > second . end ( ) ) ;
2013-11-08 17:43:03 +00:00
}
2013-08-12 00:36:18 +00:00
2012-08-02 17:33:31 +00:00
bool Context : : isTableExist ( const String & database_name , const String & table_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
Databases : : const_iterator it ;
return shared - > databases . end ( ) ! = ( it = shared - > databases . find ( db ) )
& & it - > second . end ( ) ! = it - > second . find ( table_name ) ;
}
bool Context : : isDatabaseExist ( const String & database_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
return shared - > databases . end ( ) ! = shared - > databases . find ( db ) ;
}
void Context : : assertTableExists ( const String & database_name , const String & table_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
2014-06-26 00:58:14 +00:00
2015-01-28 02:37:05 +00:00
Databases : : const_iterator it = shared - > databases . find ( db ) ;
if ( shared - > databases . end ( ) = = it )
2012-08-02 17:33:31 +00:00
throw Exception ( " Database " + db + " doesn't exist " , ErrorCodes : : UNKNOWN_DATABASE ) ;
if ( it - > second . end ( ) = = it - > second . find ( table_name ) )
throw Exception ( " Table " + db + " . " + table_name + " doesn't exist. " , ErrorCodes : : UNKNOWN_TABLE ) ;
}
void Context : : assertTableDoesntExist ( const String & database_name , const String & table_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
Databases : : const_iterator it ;
if ( shared - > databases . end ( ) ! = ( it = shared - > databases . find ( db ) )
& & it - > second . end ( ) ! = it - > second . find ( table_name ) )
throw Exception ( " Table " + db + " . " + table_name + " already exists. " , ErrorCodes : : TABLE_ALREADY_EXISTS ) ;
}
void Context : : assertDatabaseExists ( const String & database_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
if ( shared - > databases . end ( ) = = shared - > databases . find ( db ) )
throw Exception ( " Database " + db + " doesn't exist " , ErrorCodes : : UNKNOWN_DATABASE ) ;
}
void Context : : assertDatabaseDoesntExist ( const String & database_name ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
if ( shared - > databases . end ( ) ! = shared - > databases . find ( db ) )
throw Exception ( " Database " + db + " already exists. " , ErrorCodes : : DATABASE_ALREADY_EXISTS ) ;
}
2014-03-13 15:00:06 +00:00
Tables Context : : getExternalTables ( ) const
{
2014-03-14 15:42:30 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-03-13 15:00:06 +00:00
Tables res = external_tables ;
if ( session_context & & session_context ! = this )
{
Tables buf = session_context - > getExternalTables ( ) ;
res . insert ( buf . begin ( ) , buf . end ( ) ) ;
}
else if ( global_context & & global_context ! = this )
{
Tables buf = global_context - > getExternalTables ( ) ;
res . insert ( buf . begin ( ) , buf . end ( ) ) ;
}
return res ;
}
StoragePtr Context : : tryGetExternalTable ( const String & table_name ) const
2014-03-04 15:31:56 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2015-01-28 02:37:05 +00:00
Tables : : const_iterator jt = external_tables . find ( table_name ) ;
if ( external_tables . end ( ) = = jt )
2014-03-04 15:31:56 +00:00
return StoragePtr ( ) ;
return jt - > second ;
}
2012-08-02 17:33:31 +00:00
StoragePtr Context : : getTable ( const String & database_name , const String & table_name ) const
2015-08-19 21:15:27 +00:00
{
Exception exc ;
auto res = getTableImpl ( database_name , table_name , & exc ) ;
if ( ! res )
throw exc ;
return res ;
}
StoragePtr Context : : tryGetTable ( const String & database_name , const String & table_name ) const
{
return getTableImpl ( database_name , table_name , nullptr ) ;
}
StoragePtr Context : : getTableImpl ( const String & database_name , const String & table_name , Exception * exception ) const
2012-08-02 17:33:31 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2015-08-19 21:15:27 +00:00
/** Возможность обратиться к временным таблицам другого запроса в виде _query_QUERY_ID.table
* NOTE В д а л ь н е й ш е м м о ж е т п о т р е б о в а т ь с я п о д у м а т ь о б и з о л я ц и и .
*/
if ( database_name . size ( ) > strlen ( " _query_ " )
& & database_name . compare ( 0 , strlen ( " _query_ " ) , " _query_ " ) = = 0 )
2014-03-04 15:31:56 +00:00
{
2015-08-19 21:15:27 +00:00
String requested_query_id = database_name . substr ( strlen ( " _query_ " ) ) ;
auto it = shared - > temporary_tables_by_query_id . find ( requested_query_id ) ;
2012-08-02 17:33:31 +00:00
2015-08-19 21:15:27 +00:00
for ( auto & kv : shared - > temporary_tables_by_query_id )
std : : cerr < < kv . first < < " \n " ;
2012-08-02 17:33:31 +00:00
2015-08-19 21:15:27 +00:00
if ( shared - > temporary_tables_by_query_id . end ( ) = = it )
{
if ( exception )
* exception = Exception (
" Cannot find any temporary tables for query with id " + requested_query_id , ErrorCodes : : UNKNOWN_TABLE ) ;
return { } ;
}
2012-08-02 17:33:31 +00:00
2015-08-19 21:15:27 +00:00
auto jt = it - > second . find ( table_name ) ;
2012-08-02 17:33:31 +00:00
2015-08-19 21:15:27 +00:00
if ( it - > second . end ( ) = = jt )
{
if ( exception )
* exception = Exception (
" Cannot find temporary table with name " + table_name + " for query with id " + requested_query_id , ErrorCodes : : UNKNOWN_TABLE ) ;
return { } ;
}
2014-03-04 15:31:56 +00:00
2015-08-19 21:15:27 +00:00
return jt - > second ;
}
2014-03-12 13:14:16 +00:00
2014-03-06 14:02:20 +00:00
if ( database_name . empty ( ) )
{
2015-08-19 21:15:27 +00:00
StoragePtr res = tryGetExternalTable ( table_name ) ;
if ( res )
2014-03-06 14:02:20 +00:00
return res ;
}
2015-08-19 21:15:27 +00:00
2013-05-06 10:31:35 +00:00
String db = database_name . empty ( ) ? current_database : database_name ;
2014-06-26 00:58:14 +00:00
2015-01-28 02:37:05 +00:00
Databases : : const_iterator it = shared - > databases . find ( db ) ;
if ( shared - > databases . end ( ) = = it )
2015-08-19 21:15:27 +00:00
{
if ( exception )
* exception = Exception ( " Database " + db + " doesn't exist " , ErrorCodes : : UNKNOWN_DATABASE ) ;
return { } ;
}
2014-06-26 00:58:14 +00:00
2015-01-28 02:37:05 +00:00
Tables : : const_iterator jt = it - > second . find ( table_name ) ;
if ( it - > second . end ( ) = = jt )
2015-08-19 21:15:27 +00:00
{
if ( exception )
* exception = Exception ( " Table " + db + " . " + table_name + " doesn't exist. " , ErrorCodes : : UNKNOWN_TABLE ) ;
return { } ;
}
if ( ! jt - > second )
throw Exception ( " Logical error: entry for table " + db + " . " + table_name + " exists in Context but it is nullptr. " , ErrorCodes : : LOGICAL_ERROR ) ;
2014-06-26 00:58:14 +00:00
2013-05-06 10:31:35 +00:00
return jt - > second ;
}
2012-08-02 17:33:31 +00:00
2014-03-13 15:00:06 +00:00
void Context : : addExternalTable ( const String & table_name , StoragePtr storage )
2014-03-04 15:31:56 +00:00
{
2014-03-13 15:00:06 +00:00
if ( external_tables . end ( ) ! = external_tables . find ( table_name ) )
2014-03-04 15:31:56 +00:00
throw Exception ( " Temporary table " + table_name + " already exists. " , ErrorCodes : : TABLE_ALREADY_EXISTS ) ;
2015-08-19 21:15:27 +00:00
2014-03-13 15:00:06 +00:00
external_tables [ table_name ] = storage ;
2015-08-19 21:15:27 +00:00
if ( ! current_query_id . empty ( ) )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
std : : cerr < < " adding to temporary_tables_by_query_id with " < < current_query_id < < " , " < < table_name < < " \n " ;
/// NOTE Проблема при совпадении query_id у разных запросов.
shared - > temporary_tables_by_query_id [ current_query_id ] . emplace ( table_name , storage ) ;
}
2014-03-04 15:31:56 +00:00
}
2012-08-02 17:33:31 +00:00
void Context : : addTable ( const String & database_name , const String & table_name , StoragePtr table )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-03-20 10:59:45 +00:00
2012-08-02 17:33:31 +00:00
String db = database_name . empty ( ) ? current_database : database_name ;
assertDatabaseExists ( db ) ;
assertTableDoesntExist ( db , table_name ) ;
shared - > databases [ db ] [ table_name ] = table ;
}
void Context : : addDatabase ( const String & database_name )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
assertDatabaseDoesntExist ( db ) ;
shared - > databases [ db ] ;
}
2013-09-30 01:29:19 +00:00
StoragePtr Context : : detachTable ( const String & database_name , const String & table_name )
2012-08-02 17:33:31 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
assertTableExists ( db , table_name ) ;
2013-09-30 01:29:19 +00:00
Tables : : iterator it = shared - > databases [ db ] . find ( table_name ) ;
StoragePtr res = it - > second ;
shared - > databases [ db ] . erase ( it ) ;
return res ;
2012-08-02 17:33:31 +00:00
}
void Context : : detachDatabase ( const String & database_name )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
String db = database_name . empty ( ) ? current_database : database_name ;
assertDatabaseExists ( db ) ;
2014-03-20 10:59:45 +00:00
shared - > databases . erase ( db ) ;
2012-08-02 17:33:31 +00:00
}
2012-08-17 19:53:11 +00:00
ASTPtr Context : : getCreateQuery ( const String & database_name , const String & table_name ) const
{
2014-03-20 10:59:45 +00:00
StoragePtr table ;
String db ;
2012-08-17 19:53:11 +00:00
2014-03-20 10:59:45 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
db = database_name . empty ( ) ? current_database : database_name ;
table = getTable ( db , table_name ) ;
}
2012-08-17 19:53:11 +00:00
2014-03-20 10:59:45 +00:00
auto table_lock = table - > lockStructure ( false ) ;
2012-08-17 19:53:11 +00:00
/// Здесь хранится определение таблицы
String metadata_path = shared - > path + " metadata/ " + escapeForFileName ( db ) + " / " + escapeForFileName ( table_name ) + " .sql " ;
2014-06-26 00:58:14 +00:00
2012-08-17 19:53:11 +00:00
if ( ! Poco : : File ( metadata_path ) . exists ( ) )
2013-06-17 07:01:31 +00:00
{
try
{
/// Если файл .sql не предусмотрен (например, для таблиц типа ChunkRef), то движок может сам предоставить запрос CREATE.
2014-03-20 10:59:45 +00:00
return table - > getCustomCreateQuery ( * this ) ;
2013-06-17 07:01:31 +00:00
}
catch ( . . . )
{
throw Exception ( " Metadata file " + metadata_path + " for table " + db + " . " + table_name + " doesn't exist. " ,
ErrorCodes : : TABLE_METADATA_DOESNT_EXIST ) ;
}
}
2012-08-17 19:53:11 +00:00
2013-02-11 11:22:15 +00:00
StringPtr query = new String ( ) ;
2012-08-17 19:53:11 +00:00
{
ReadBufferFromFile in ( metadata_path ) ;
2013-02-11 11:22:15 +00:00
WriteBufferFromString out ( * query ) ;
2012-08-17 19:53:11 +00:00
copyData ( in , out ) ;
}
ParserCreateQuery parser ;
2015-04-11 03:10:23 +00:00
ASTPtr ast = parseQuery ( parser , query - > data ( ) , query - > data ( ) + query - > size ( ) , " in file " + metadata_path ) ;
2012-08-17 19:53:11 +00:00
2014-06-26 00:58:14 +00:00
ASTCreateQuery & ast_create_query = typeid_cast < ASTCreateQuery & > ( * ast ) ;
2012-08-17 19:53:11 +00:00
ast_create_query . attach = false ;
ast_create_query . database = db ;
2013-02-11 11:22:15 +00:00
ast_create_query . query_string = query ;
2012-08-17 19:53:11 +00:00
return ast ;
}
2012-08-02 17:33:31 +00:00
Settings Context : : getSettings ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return settings ;
}
2013-05-05 20:07:11 +00:00
Limits Context : : getLimits ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return settings . limits ;
}
2012-08-02 17:33:31 +00:00
void Context : : setSettings ( const Settings & settings_ )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
settings = settings_ ;
}
2012-08-02 19:03:32 +00:00
void Context : : setSetting ( const String & name , const Field & value )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-02-13 07:17:22 +00:00
if ( name = = " profile " )
settings . setProfile ( value . safeGet < String > ( ) , * shared - > users_config ) ;
else
settings . set ( name , value ) ;
}
void Context : : setSetting ( const String & name , const std : : string & value )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( name = = " profile " )
settings . setProfile ( value , * shared - > users_config ) ;
else
settings . set ( name , value ) ;
2012-08-02 19:03:32 +00:00
}
2012-08-02 17:33:31 +00:00
String Context : : getCurrentDatabase ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return current_database ;
}
2014-02-12 17:31:02 +00:00
String Context : : getCurrentQueryId ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return current_query_id ;
}
2012-08-02 17:33:31 +00:00
void Context : : setCurrentDatabase ( const String & name )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
assertDatabaseExists ( name ) ;
current_database = name ;
}
2014-02-12 17:31:02 +00:00
void Context : : setCurrentQueryId ( const String & query_id )
{
2015-08-19 21:15:27 +00:00
if ( ! current_query_id . empty ( ) )
throw Exception ( " Logical error: attempt to set query_id twice " , ErrorCodes : : LOGICAL_ERROR ) ;
2015-07-25 09:49:09 +00:00
String query_id_to_set = query_id ;
if ( query_id_to_set . empty ( ) ) /// Если пользователь не передал свой query_id, то генерируем е г о самостоятельно.
query_id_to_set = shared - > uuid_generator . createRandom ( ) . toString ( ) ;
2014-02-12 17:31:02 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2015-07-25 09:49:09 +00:00
current_query_id = query_id_to_set ;
2015-08-19 21:15:27 +00:00
if ( ! external_tables . empty ( ) )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
/// NOTE Проблема при совпадении query_id у разных запросов.
shared - > temporary_tables_by_query_id [ current_query_id ] . insert ( external_tables . begin ( ) , external_tables . end ( ) ) ;
}
2014-02-12 17:31:02 +00:00
}
2013-06-29 18:03:57 +00:00
String Context : : getDefaultFormat ( ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
return default_format . empty ( ) ? " TabSeparated " : default_format ;
}
void Context : : setDefaultFormat ( const String & name )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
default_format = name ;
}
2014-08-11 15:59:01 +00:00
const Macros & Context : : getMacros ( ) const
{
return shared - > macros ;
}
void Context : : setMacros ( Macros & & macros )
{
/// Полагаемся, что это присваивание происходит один раз при старте сервера. Если это не так, нужно использовать мьютекс.
shared - > macros = macros ;
}
2013-06-29 18:03:57 +00:00
2012-08-02 17:33:31 +00:00
Context & Context : : getSessionContext ( )
{
if ( ! session_context )
throw Exception ( " There is no session " , ErrorCodes : : THERE_IS_NO_SESSION ) ;
return * session_context ;
}
Context & Context : : getGlobalContext ( )
{
if ( ! global_context )
throw Exception ( " Logical error: there is no global context " , ErrorCodes : : LOGICAL_ERROR ) ;
return * global_context ;
}
2012-12-19 20:15:15 +00:00
2012-12-21 19:48:47 +00:00
const Dictionaries & Context : : getDictionaries ( ) const
2012-12-19 20:15:15 +00:00
{
2015-04-02 16:30:18 +00:00
return getDictionariesImpl ( false ) ;
2015-02-10 17:40:40 +00:00
}
const ExternalDictionaries & Context : : getExternalDictionaries ( ) const
{
2015-04-02 16:30:18 +00:00
return getExternalDictionariesImpl ( false ) ;
2015-03-27 13:11:22 +00:00
}
2015-04-02 16:30:18 +00:00
const Dictionaries & Context : : getDictionariesImpl ( const bool throw_on_error ) const
2015-03-27 13:11:22 +00:00
{
2015-04-02 16:30:18 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2015-03-27 13:11:22 +00:00
if ( ! shared - > dictionaries )
shared - > dictionaries = new Dictionaries { throw_on_error } ;
2015-04-02 16:30:18 +00:00
return * shared - > dictionaries ;
2015-03-27 13:11:22 +00:00
}
2015-04-02 16:30:18 +00:00
const ExternalDictionaries & Context : : getExternalDictionariesImpl ( const bool throw_on_error ) const
2015-03-27 13:11:22 +00:00
{
2015-04-22 13:53:32 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > external_dictionaries_mutex ) ;
2015-04-02 16:30:18 +00:00
2015-02-10 17:40:40 +00:00
if ( ! shared - > external_dictionaries )
2015-02-03 17:14:12 +00:00
{
if ( ! this - > global_context )
throw Exception ( " Logical error: there is no global context " , ErrorCodes : : LOGICAL_ERROR ) ;
2015-03-27 13:11:22 +00:00
shared - > external_dictionaries = new ExternalDictionaries { * this - > global_context , throw_on_error } ;
2015-02-03 17:14:12 +00:00
}
2015-04-02 16:30:18 +00:00
return * shared - > external_dictionaries ;
}
void Context : : tryCreateDictionaries ( ) const
{
static_cast < void > ( getDictionariesImpl ( true ) ) ;
}
void Context : : tryCreateExternalDictionaries ( ) const
{
static_cast < void > ( getExternalDictionariesImpl ( true ) ) ;
2012-12-19 20:15:15 +00:00
}
2013-02-16 14:55:14 +00:00
void Context : : setProgressCallback ( ProgressCallback callback )
{
/// Колбек устанавливается на сессию или на запрос. В сессии одновременно обрабатывается только один запрос. Поэтому блокировка не нужна.
progress_callback = callback ;
}
ProgressCallback Context : : getProgressCallback ( ) const
{
return progress_callback ;
}
2013-09-03 20:21:28 +00:00
2013-11-03 05:32:42 +00:00
void Context : : setProcessListElement ( ProcessList : : Element * elem )
{
/// Устанавливается на сессию или на запрос. В сессии одновременно обрабатывается только один запрос. Поэтому блокировка не нужна.
process_list_elem = elem ;
}
ProcessList : : Element * Context : : getProcessListElement ( )
{
return process_list_elem ;
}
2014-03-28 14:36:24 +00:00
void Context : : setUncompressedCache ( size_t max_size_in_bytes )
2013-09-08 05:53:10 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( shared - > uncompressed_cache )
throw Exception ( " Uncompressed cache has been already created. " , ErrorCodes : : LOGICAL_ERROR ) ;
2015-04-16 06:12:35 +00:00
shared - > uncompressed_cache . reset ( new UncompressedCache ( max_size_in_bytes ) ) ;
2013-09-08 05:53:10 +00:00
}
UncompressedCachePtr Context : : getUncompressedCache ( ) const
{
2015-04-16 06:12:35 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2013-09-08 05:53:10 +00:00
return shared - > uncompressed_cache ;
}
2014-02-11 13:30:42 +00:00
void Context : : setMarkCache ( size_t cache_size_in_bytes )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( shared - > mark_cache )
throw Exception ( " Uncompressed cache has been already created. " , ErrorCodes : : LOGICAL_ERROR ) ;
2015-05-07 10:31:50 +00:00
shared - > mark_cache . reset ( new MarkCache ( cache_size_in_bytes , std : : chrono : : seconds ( settings . mark_cache_min_lifetime ) ) ) ;
2014-02-11 13:30:42 +00:00
}
MarkCachePtr Context : : getMarkCache ( ) const
{
2015-04-16 06:12:35 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-02-11 13:30:42 +00:00
return shared - > mark_cache ;
}
2014-07-02 12:30:38 +00:00
BackgroundProcessingPool & Context : : getBackgroundPool ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > background_pool )
shared - > background_pool = new BackgroundProcessingPool ( settings . background_pool_size ) ;
return * shared - > background_pool ;
}
2014-04-05 19:54:00 +00:00
void Context : : resetCaches ( ) const
{
2015-04-16 06:12:35 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-04-05 19:54:00 +00:00
if ( shared - > uncompressed_cache )
shared - > uncompressed_cache - > reset ( ) ;
if ( shared - > mark_cache )
shared - > mark_cache - > reset ( ) ;
}
2014-05-13 10:10:26 +00:00
void Context : : setZooKeeper ( zkutil : : ZooKeeperPtr zookeeper )
2014-03-21 19:17:59 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( shared - > zookeeper )
throw Exception ( " ZooKeeper client has already been set. " , ErrorCodes : : LOGICAL_ERROR ) ;
shared - > zookeeper = zookeeper ;
}
2014-05-13 10:10:26 +00:00
zkutil : : ZooKeeperPtr Context : : getZooKeeper ( ) const
2014-03-21 19:17:59 +00:00
{
2014-04-25 13:55:15 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
2014-05-13 11:24:04 +00:00
if ( shared - > zookeeper & & shared - > zookeeper - > expired ( ) )
2014-04-25 13:55:15 +00:00
shared - > zookeeper = shared - > zookeeper - > startNewSession ( ) ;
return shared - > zookeeper ;
2014-03-21 19:17:59 +00:00
}
2014-11-19 20:40:51 +00:00
void Context : : setInterserverIOAddress ( const String & host , UInt16 port )
2014-03-21 19:49:27 +00:00
{
shared - > interserver_io_host = host ;
shared - > interserver_io_port = port ;
}
2014-11-19 20:40:51 +00:00
std : : pair < String , UInt16 > Context : : getInterserverIOAddress ( ) const
2014-03-21 19:49:27 +00:00
{
2014-11-19 20:40:51 +00:00
if ( shared - > interserver_io_host . empty ( ) | | shared - > interserver_io_port = = 0 )
throw Exception ( " Parameter 'interserver_http_port' required for replication is not specified in configuration file. " ,
ErrorCodes : : NO_ELEMENTS_IN_CONFIG ) ;
return { shared - > interserver_io_host , shared - > interserver_io_port } ;
2014-03-21 19:49:27 +00:00
}
2013-12-10 17:06:57 +00:00
void Context : : initClusters ( )
2013-12-07 16:51:29 +00:00
{
2013-12-10 17:06:57 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > clusters )
2015-05-28 03:49:28 +00:00
shared - > clusters = new Clusters ( settings ) ;
2013-12-10 17:06:57 +00:00
}
2013-12-07 16:51:29 +00:00
2013-12-10 17:06:57 +00:00
Cluster & Context : : getCluster ( const std : : string & cluster_name )
{
if ( ! shared - > clusters )
throw Poco : : Exception ( " Clusters have not been initialized yet. " ) ;
2014-02-22 21:50:27 +00:00
Clusters : : Impl : : iterator it = shared - > clusters - > impl . find ( cluster_name ) ;
if ( it ! = shared - > clusters - > impl . end ( ) )
2013-12-07 16:51:29 +00:00
return it - > second ;
else
throw Poco : : Exception ( " Failed to find cluster with name = " + cluster_name ) ;
}
2015-01-10 02:30:03 +00:00
2015-04-30 12:43:16 +00:00
Poco : : SharedPtr < Clusters > Context : : getClusters ( ) const
{
if ( ! shared - > clusters )
throw Poco : : Exception ( " Clusters have not been initialized yet. " ) ;
return shared - > clusters ;
}
2015-01-10 02:30:03 +00:00
Compiler & Context : : getCompiler ( )
{
2015-01-11 00:35:30 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > compiler )
shared - > compiler . reset ( new Compiler { shared - > path + " build/ " , 1 } ) ;
return * shared - > compiler ;
2015-01-10 02:30:03 +00:00
}
2015-06-26 20:48:10 +00:00
QueryLog & Context : : getQueryLog ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > query_log )
{
auto & config = Poco : : Util : : Application : : instance ( ) . config ( ) ;
String database = config . getString ( " query_log.database " , " system " ) ;
String table = config . getString ( " query_log.table " , " query_log " ) ;
size_t flush_interval_milliseconds = parse < size_t > (
config . getString ( " query_log.flush_interval_milliseconds " , DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS_STR ) ) ;
shared - > query_log . reset ( new QueryLog { * this , database , table , flush_interval_milliseconds } ) ;
}
return * shared - > query_log ;
}
2015-04-17 05:35:53 +00:00
CompressionMethod Context : : chooseCompressionMethod ( size_t part_size , double part_size_ratio ) const
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > compression_method_selector )
{
constexpr auto config_name = " compression " ;
auto & config = Poco : : Util : : Application : : instance ( ) . config ( ) ;
if ( config . has ( config_name ) )
shared - > compression_method_selector . reset ( new CompressionMethodSelector { config , " compression " } ) ;
else
shared - > compression_method_selector . reset ( new CompressionMethodSelector ) ;
}
return shared - > compression_method_selector - > choose ( part_size , part_size_ratio ) ;
}
2015-07-16 21:32:51 +00:00
const MergeTreeSettings & Context : : getMergeTreeSettings ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( shared - > mutex ) ;
if ( ! shared - > merge_tree_settings )
{
auto & config = Poco : : Util : : Application : : instance ( ) . config ( ) ;
shared - > merge_tree_settings . reset ( new MergeTreeSettings ( ) ) ;
shared - > merge_tree_settings - > loadFromConfig ( " merge_tree " , config ) ;
}
return * shared - > merge_tree_settings ;
}
2015-04-16 06:12:35 +00:00
void Context : : shutdown ( )
{
shared - > shutdown ( ) ;
}
2012-08-02 17:33:31 +00:00
}