2022-10-26 11:05:20 +00:00
# include <IO/WriteHelpers.h>
# include <Interpreters/AsynchronousInsertLog.h>
2021-06-16 09:45:43 +00:00
# include <Interpreters/AsynchronousMetricLog.h>
2022-10-26 11:05:20 +00:00
# include <Interpreters/Context.h>
2021-06-16 09:45:43 +00:00
# include <Interpreters/CrashLog.h>
2022-10-26 11:05:20 +00:00
# include <Interpreters/InterpreterCreateQuery.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Interpreters/InterpreterRenameQuery.h>
2021-06-16 09:45:43 +00:00
# include <Interpreters/MetricLog.h>
# include <Interpreters/OpenTelemetrySpanLog.h>
# include <Interpreters/PartLog.h>
2022-10-26 11:05:20 +00:00
# include <Interpreters/ProcessorsProfileLog.h>
2017-06-05 13:59:38 +00:00
# include <Interpreters/QueryLog.h>
2018-05-31 15:54:08 +00:00
# include <Interpreters/QueryThreadLog.h>
2021-06-18 13:44:08 +00:00
# include <Interpreters/QueryViewsLog.h>
2021-08-30 17:37:07 +00:00
# include <Interpreters/SessionLog.h>
2019-07-22 13:54:08 +00:00
# include <Interpreters/TextLog.h>
2019-02-03 21:30:45 +00:00
# include <Interpreters/TraceLog.h>
2022-01-14 14:03:00 +00:00
# include <Interpreters/TransactionsInfoLog.h>
2023-02-07 17:50:31 +00:00
# include <Interpreters/FilesystemCacheLog.h>
# include <Interpreters/FilesystemReadPrefetchesLog.h>
2022-10-26 11:05:20 +00:00
# include <Interpreters/ZooKeeperLog.h>
2022-01-10 19:01:41 +00:00
# include <Parsers/ASTCreateQuery.h>
2022-10-26 11:05:20 +00:00
# include <Parsers/ASTFunction.h>
2022-01-10 19:01:41 +00:00
# include <Parsers/ASTIndexDeclaration.h>
# include <Parsers/ASTInsertQuery.h>
2022-10-26 11:05:20 +00:00
# include <Parsers/ASTRenameQuery.h>
# include <Parsers/ParserCreateQuery.h>
# include <Parsers/formatAST.h>
# include <Parsers/parseQuery.h>
# include <Processors/Executors/PushingPipelineExecutor.h>
2022-01-10 19:01:41 +00:00
# include <Storages/IStorage.h>
2022-02-28 15:25:54 +00:00
# include <Storages/MergeTree/MergeTreeSettings.h>
2022-10-26 11:05:20 +00:00
# include <base/scope_guard.h>
2018-07-08 04:54:37 +00:00
# include <Poco/Util/AbstractConfiguration.h>
2022-10-26 11:05:20 +00:00
# include <Common/MemoryTrackerBlockerInThread.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2022-10-26 11:05:20 +00:00
# include <Common/setThreadName.h>
2018-07-08 04:54:37 +00:00
2017-06-05 13:59:38 +00:00
namespace DB
{
2020-01-23 19:21:57 +00:00
2020-01-23 18:28:37 +00:00
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS ;
2022-01-10 19:01:41 +00:00
extern const int LOGICAL_ERROR ;
2022-03-23 10:52:37 +00:00
extern const int NOT_IMPLEMENTED ;
2020-01-23 18:28:37 +00:00
}
2017-06-05 13:59:38 +00:00
2022-03-22 21:44:46 +00:00
namespace
{
class StorageWithComment : public IAST
{
public :
ASTPtr storage ;
ASTPtr comment ;
String getID ( char ) const override { return " Storage with comment definition " ; }
2022-03-23 09:46:27 +00:00
ASTPtr clone ( ) const override
{
2022-03-22 21:44:46 +00:00
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Method clone is not supported " ) ;
}
void formatImpl ( const FormatSettings & , FormatState & , FormatStateStacked ) const override
{
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Method formatImpl is not supported " ) ;
}
} ;
class ParserStorageWithComment : public IParserBase
{
protected :
const char * getName ( ) const override { return " storage definition with comment " ; }
bool parseImpl ( Pos & pos , ASTPtr & node , Expected & expected ) override
{
2022-11-11 15:26:04 +00:00
ParserStorage storage_p { ParserStorage : : TABLE_ENGINE } ;
2022-03-22 21:44:46 +00:00
ASTPtr storage ;
if ( ! storage_p . parse ( pos , storage , expected ) )
return false ;
ParserKeyword s_comment ( " COMMENT " ) ;
ParserStringLiteral string_literal_parser ;
ASTPtr comment ;
if ( s_comment . ignore ( pos , expected ) )
string_literal_parser . parse ( pos , comment , expected ) ;
auto storage_with_comment = std : : make_shared < StorageWithComment > ( ) ;
storage_with_comment - > storage = std : : move ( storage ) ;
storage_with_comment - > comment = std : : move ( comment ) ;
node = storage_with_comment ;
return true ;
}
} ;
2020-01-23 18:28:37 +00:00
}
2017-06-05 13:59:38 +00:00
2019-02-22 16:10:14 +00:00
namespace
{
constexpr size_t DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS = 7500 ;
2020-11-09 14:49:48 +00:00
constexpr size_t DEFAULT_METRIC_LOG_COLLECT_INTERVAL_MILLISECONDS = 1000 ;
2019-02-22 16:10:14 +00:00
/// Creates a system log with MergeTree engine using parameters from config
template < typename TSystemLog >
2019-03-21 19:22:38 +00:00
std : : shared_ptr < TSystemLog > createSystemLog (
2021-04-10 23:33:54 +00:00
ContextPtr context ,
2019-02-22 16:10:14 +00:00
const String & default_database_name ,
const String & default_table_name ,
const Poco : : Util : : AbstractConfiguration & config ,
2020-04-19 23:11:41 +00:00
const String & config_prefix )
2019-02-22 16:10:14 +00:00
{
if ( ! config . has ( config_prefix ) )
2021-03-05 14:57:16 +00:00
{
LOG_DEBUG ( & Poco : : Logger : : get ( " SystemLog " ) ,
" Not creating {}.{} since corresponding section '{}' is missing from config " ,
default_database_name , default_table_name , config_prefix ) ;
2019-02-22 16:10:14 +00:00
return { } ;
2021-03-05 14:57:16 +00:00
}
2023-02-07 17:50:31 +00:00
LOG_DEBUG ( & Poco : : Logger : : get ( " SystemLog " ) ,
" Creating {}.{} from {} " , default_database_name , default_table_name , config_prefix ) ;
2019-02-22 16:10:14 +00:00
String database = config . getString ( config_prefix + " .database " , default_database_name ) ;
String table = config . getString ( config_prefix + " .table " , default_table_name ) ;
2020-01-23 11:22:06 +00:00
2020-03-02 17:25:36 +00:00
if ( database ! = default_database_name )
{
/// System tables must be loaded before other tables, but loading order is undefined for all databases except `system`
2020-07-09 04:15:45 +00:00
LOG_ERROR ( & Poco : : Logger : : get ( " SystemLog " ) , " Custom database name for a system table specified in config. "
" Table `{}` will be created in `system` database instead of `{}` " , table , database ) ;
2020-03-02 17:25:36 +00:00
database = default_database_name ;
}
2020-01-23 11:22:06 +00:00
String engine ;
2020-01-23 18:28:37 +00:00
if ( config . has ( config_prefix + " .engine " ) )
{
if ( config . has ( config_prefix + " .partition_by " ) )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS ,
" If 'engine' is specified for system table, PARTITION BY parameters should "
" be specified directly inside 'engine' and 'partition_by' setting doesn't make sense " ) ;
2020-11-30 15:32:17 +00:00
if ( config . has ( config_prefix + " .ttl " ) )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " If 'engine' is specified for system table, "
" TTL parameters should be specified directly inside 'engine' and 'ttl' setting doesn't make sense " ) ;
2022-10-26 11:05:20 +00:00
if ( config . has ( config_prefix + " .storage_policy " ) )
2023-01-23 21:13:58 +00:00
throw Exception ( ErrorCodes : : BAD_ARGUMENTS , " If 'engine' is specified for system table, SETTINGS storage_policy = '...' "
" should be specified directly inside 'engine' and 'storage_policy' setting doesn't make sense " ) ;
2020-01-23 18:28:37 +00:00
engine = config . getString ( config_prefix + " .engine " ) ;
}
2020-01-23 11:22:06 +00:00
else
{
String partition_by = config . getString ( config_prefix + " .partition_by " , " toYYYYMM(event_date) " ) ;
2020-07-09 04:15:45 +00:00
engine = " ENGINE = MergeTree " ;
if ( ! partition_by . empty ( ) )
engine + = " PARTITION BY ( " + partition_by + " ) " ;
2020-11-30 15:32:17 +00:00
String ttl = config . getString ( config_prefix + " .ttl " , " " ) ;
if ( ! ttl . empty ( ) )
engine + = " TTL " + ttl ;
2022-04-17 19:56:36 +00:00
2022-04-17 19:57:44 +00:00
engine + = " ORDER BY " ;
engine + = TSystemLog : : getDefaultOrderBy ( ) ;
2022-10-26 11:05:20 +00:00
String storage_policy = config . getString ( config_prefix + " .storage_policy " , " " ) ;
if ( ! storage_policy . empty ( ) )
engine + = " SETTINGS storage_policy = " + quoteString ( storage_policy ) ;
2020-01-23 11:22:06 +00:00
}
2022-03-22 21:44:46 +00:00
2022-04-17 21:49:39 +00:00
/// Validate engine definition syntax to prevent some configuration errors.
2022-03-22 21:44:46 +00:00
ParserStorageWithComment storage_parser ;
2020-12-02 16:45:17 +00:00
parseQuery ( storage_parser , engine . data ( ) , engine . data ( ) + engine . size ( ) ,
" Storage to create table for " + config_prefix , 0 , DBMS_DEFAULT_MAX_PARSER_DEPTH ) ;
2019-02-22 16:10:14 +00:00
2020-07-09 04:15:45 +00:00
size_t flush_interval_milliseconds = config . getUInt64 ( config_prefix + " .flush_interval_milliseconds " ,
DEFAULT_SYSTEM_LOG_FLUSH_INTERVAL_MILLISECONDS ) ;
2019-02-22 16:10:14 +00:00
2020-04-19 23:11:41 +00:00
return std : : make_shared < TSystemLog > ( context , database , table , engine , flush_interval_milliseconds ) ;
2019-02-22 16:10:14 +00:00
}
2019-08-13 16:17:18 +00:00
2019-02-22 16:10:14 +00:00
2022-02-28 15:25:54 +00:00
/// returns CREATE TABLE query, but with removed UUID
2022-01-25 09:58:11 +00:00
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
ASTPtr getCreateTableQueryClean ( const StorageID & table_id , ContextPtr context )
2021-11-25 18:06:04 +00:00
{
DatabasePtr database = DatabaseCatalog : : instance ( ) . getDatabase ( table_id . database_name ) ;
ASTPtr old_ast = database - > getCreateTableQuery ( table_id . table_name , context ) ;
auto & old_create_query_ast = old_ast - > as < ASTCreateQuery & > ( ) ;
/// Reset UUID
old_create_query_ast . uuid = UUIDHelpers : : Nil ;
return old_ast ;
}
2022-01-10 19:01:41 +00:00
}
2022-04-17 18:48:02 +00:00
2021-04-10 23:33:54 +00:00
SystemLogs : : SystemLogs ( ContextPtr global_context , const Poco : : Util : : AbstractConfiguration & config )
2018-07-08 04:54:37 +00:00
{
2020-04-19 23:11:41 +00:00
query_log = createSystemLog < QueryLog > ( global_context , " system " , " query_log " , config , " query_log " ) ;
query_thread_log = createSystemLog < QueryThreadLog > ( global_context , " system " , " query_thread_log " , config , " query_thread_log " ) ;
part_log = createSystemLog < PartLog > ( global_context , " system " , " part_log " , config , " part_log " ) ;
trace_log = createSystemLog < TraceLog > ( global_context , " system " , " trace_log " , config , " trace_log " ) ;
2020-07-09 04:15:45 +00:00
crash_log = createSystemLog < CrashLog > ( global_context , " system " , " crash_log " , config , " crash_log " ) ;
2020-04-19 23:11:41 +00:00
text_log = createSystemLog < TextLog > ( global_context , " system " , " text_log " , config , " text_log " ) ;
metric_log = createSystemLog < MetricLog > ( global_context , " system " , " metric_log " , config , " metric_log " ) ;
2023-02-07 17:50:31 +00:00
filesystem_cache_log = createSystemLog < FilesystemCacheLog > ( global_context , " system " , " filesystem_cache_log " , config , " filesystem_cache_log " ) ;
filesystem_read_prefetches_log = createSystemLog < FilesystemReadPrefetchesLog > (
global_context , " system " , " filesystem_read_prefetches_log " , config , " filesystem_read_prefetches_log " ) ;
2020-06-10 19:17:30 +00:00
asynchronous_metric_log = createSystemLog < AsynchronousMetricLog > (
global_context , " system " , " asynchronous_metric_log " , config ,
" asynchronous_metric_log " ) ;
2020-10-22 16:47:20 +00:00
opentelemetry_span_log = createSystemLog < OpenTelemetrySpanLog > (
global_context , " system " , " opentelemetry_span_log " , config ,
" opentelemetry_span_log " ) ;
2021-06-18 13:44:08 +00:00
query_views_log = createSystemLog < QueryViewsLog > ( global_context , " system " , " query_views_log " , config , " query_views_log " ) ;
2021-07-09 14:05:35 +00:00
zookeeper_log = createSystemLog < ZooKeeperLog > ( global_context , " system " , " zookeeper_log " , config , " zookeeper_log " ) ;
2021-03-05 14:57:16 +00:00
session_log = createSystemLog < SessionLog > ( global_context , " system " , " session_log " , config , " session_log " ) ;
2022-01-14 14:03:00 +00:00
transactions_info_log = createSystemLog < TransactionsInfoLog > (
global_context , " system " , " transactions_info_log " , config , " transactions_info_log " ) ;
2022-02-05 16:33:42 +00:00
processors_profile_log = createSystemLog < ProcessorsProfileLog > ( global_context , " system " , " processors_profile_log " , config , " processors_profile_log " ) ;
2022-10-03 18:52:14 +00:00
asynchronous_insert_log = createSystemLog < AsynchronousInsertLog > ( global_context , " system " , " asynchronous_insert_log " , config , " asynchronous_insert_log " ) ;
2018-12-14 16:12:12 +00:00
2020-04-13 01:33:05 +00:00
if ( query_log )
logs . emplace_back ( query_log . get ( ) ) ;
if ( query_thread_log )
logs . emplace_back ( query_thread_log . get ( ) ) ;
if ( part_log )
logs . emplace_back ( part_log . get ( ) ) ;
if ( trace_log )
logs . emplace_back ( trace_log . get ( ) ) ;
2020-07-09 04:15:45 +00:00
if ( crash_log )
logs . emplace_back ( crash_log . get ( ) ) ;
2020-04-13 01:33:05 +00:00
if ( text_log )
logs . emplace_back ( text_log . get ( ) ) ;
if ( metric_log )
logs . emplace_back ( metric_log . get ( ) ) ;
2020-06-10 19:17:30 +00:00
if ( asynchronous_metric_log )
logs . emplace_back ( asynchronous_metric_log . get ( ) ) ;
2020-10-22 16:47:20 +00:00
if ( opentelemetry_span_log )
logs . emplace_back ( opentelemetry_span_log . get ( ) ) ;
2021-06-18 13:44:08 +00:00
if ( query_views_log )
logs . emplace_back ( query_views_log . get ( ) ) ;
2021-07-09 14:05:35 +00:00
if ( zookeeper_log )
logs . emplace_back ( zookeeper_log . get ( ) ) ;
2021-03-05 14:57:16 +00:00
if ( session_log )
2022-05-18 19:57:20 +00:00
{
2021-03-05 14:57:16 +00:00
logs . emplace_back ( session_log . get ( ) ) ;
2022-05-23 19:55:17 +00:00
global_context - > addWarningMessage ( " Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring. " ) ;
2022-05-18 19:57:20 +00:00
}
2022-01-14 14:03:00 +00:00
if ( transactions_info_log )
logs . emplace_back ( transactions_info_log . get ( ) ) ;
2022-02-05 16:33:42 +00:00
if ( processors_profile_log )
logs . emplace_back ( processors_profile_log . get ( ) ) ;
2023-02-07 17:50:31 +00:00
if ( filesystem_cache_log )
logs . emplace_back ( filesystem_cache_log . get ( ) ) ;
if ( filesystem_read_prefetches_log )
logs . emplace_back ( filesystem_read_prefetches_log . get ( ) ) ;
2022-10-03 18:52:14 +00:00
if ( asynchronous_insert_log )
logs . emplace_back ( asynchronous_insert_log . get ( ) ) ;
2020-06-10 19:17:30 +00:00
2020-05-09 13:02:37 +00:00
try
{
for ( auto & log : logs )
log - > startup ( ) ;
}
catch ( . . . )
2020-04-19 23:11:41 +00:00
{
2020-05-09 13:02:37 +00:00
/// join threads
shutdown ( ) ;
throw ;
2020-04-19 23:11:41 +00:00
}
2020-05-29 16:57:53 +00:00
if ( metric_log )
{
2020-11-09 14:49:48 +00:00
size_t collect_interval_milliseconds = config . getUInt64 ( " metric_log.collect_interval_milliseconds " ,
DEFAULT_METRIC_LOG_COLLECT_INTERVAL_MILLISECONDS ) ;
2020-05-29 16:57:53 +00:00
metric_log - > startCollectMetric ( collect_interval_milliseconds ) ;
}
2020-07-09 04:15:45 +00:00
if ( crash_log )
{
CrashLog : : initialize ( crash_log ) ;
}
2018-07-08 04:54:37 +00:00
}
2019-03-21 19:22:38 +00:00
SystemLogs : : ~ SystemLogs ( )
2019-06-21 17:25:47 +00:00
{
shutdown ( ) ;
}
void SystemLogs : : shutdown ( )
2019-03-21 19:22:38 +00:00
{
2020-04-13 01:33:05 +00:00
for ( auto & log : logs )
log - > shutdown ( ) ;
2019-08-13 14:31:46 +00:00
}
2022-04-17 18:48:02 +00:00
2022-01-10 19:01:41 +00:00
template < typename LogElement >
SystemLog < LogElement > : : SystemLog (
ContextPtr context_ ,
const String & database_name_ ,
const String & table_name_ ,
const String & storage_def_ ,
size_t flush_interval_milliseconds_ )
: WithContext ( context_ )
, table_id ( database_name_ , table_name_ )
, storage_def ( storage_def_ )
, create_query ( serializeAST ( * getCreateTableQuery ( ) ) )
, flush_interval_milliseconds ( flush_interval_milliseconds_ )
{
assert ( database_name_ = = DatabaseCatalog : : SYSTEM_DATABASE ) ;
log = & Poco : : Logger : : get ( " SystemLog ( " + database_name_ + " . " + table_name_ + " ) " ) ;
}
template < typename LogElement >
void SystemLog < LogElement > : : shutdown ( )
{
stopFlushThread ( ) ;
auto table = DatabaseCatalog : : instance ( ) . tryGetTable ( table_id , getContext ( ) ) ;
if ( table )
table - > flushAndShutdown ( ) ;
}
template < typename LogElement >
void SystemLog < LogElement > : : savingThreadFunction ( )
{
setThreadName ( " SystemLogFlush " ) ;
std : : vector < LogElement > to_flush ;
bool exit_this_thread = false ;
while ( ! exit_this_thread )
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0 ;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false ;
{
std : : unique_lock lock ( mutex ) ;
flush_event . wait_for ( lock ,
std : : chrono : : milliseconds ( flush_interval_milliseconds ) ,
[ & ] ( )
{
return requested_flush_up_to > flushed_up_to | | is_shutdown | | is_force_prepare_tables ;
}
) ;
queue_front_index + = queue . size ( ) ;
to_flush_end = queue_front_index ;
// Swap with existing array from previous flush, to save memory
// allocations.
to_flush . resize ( 0 ) ;
queue . swap ( to_flush ) ;
should_prepare_tables_anyway = is_force_prepare_tables ;
exit_this_thread = is_shutdown ;
}
if ( to_flush . empty ( ) )
{
if ( should_prepare_tables_anyway )
{
prepareTable ( ) ;
LOG_TRACE ( log , " Table created (force) " ) ;
std : : lock_guard lock ( mutex ) ;
is_force_prepare_tables = false ;
flush_event . notify_all ( ) ;
}
}
else
{
flushImpl ( to_flush , to_flush_end ) ;
}
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
LOG_TRACE ( log , " Terminating " ) ;
}
template < typename LogElement >
void SystemLog < LogElement > : : flushImpl ( const std : : vector < LogElement > & to_flush , uint64_t to_flush_end )
{
try
{
LOG_TRACE ( log , " Flushing system log, {} entries to flush up to offset {} " ,
to_flush . size ( ) , to_flush_end ) ;
/// We check for existence of the table and create it as needed at every
/// flush. This is done to allow user to drop the table at any moment
/// (new empty table will be created automatically). BTW, flush method
/// is called from single thread.
prepareTable ( ) ;
ColumnsWithTypeAndName log_element_columns ;
auto log_element_names_and_types = LogElement : : getNamesAndTypes ( ) ;
for ( const auto & name_and_type : log_element_names_and_types )
log_element_columns . emplace_back ( name_and_type . type , name_and_type . name ) ;
2022-04-04 20:41:42 +00:00
Block block ( std : : move ( log_element_columns ) ) ;
2022-01-10 19:01:41 +00:00
MutableColumns columns = block . mutateColumns ( ) ;
for ( const auto & elem : to_flush )
elem . appendToBlock ( columns ) ;
block . setColumns ( std : : move ( columns ) ) ;
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std : : unique_ptr < ASTInsertQuery > insert = std : : make_unique < ASTInsertQuery > ( ) ;
insert - > table_id = table_id ;
ASTPtr query_ptr ( insert . release ( ) ) ;
// we need query context to do inserts to target table with MV containing subqueries or joins
auto insert_context = Context : : createCopy ( context ) ;
insert_context - > makeQueryContext ( ) ;
2023-02-21 15:27:30 +00:00
/// We always want to deliver the data to the original table regardless of the MVs
insert_context - > setSetting ( " materialized_views_ignore_errors " , true ) ;
2022-01-10 19:01:41 +00:00
InterpreterInsertQuery interpreter ( query_ptr , insert_context ) ;
BlockIO io = interpreter . execute ( ) ;
PushingPipelineExecutor executor ( io . pipeline ) ;
executor . start ( ) ;
executor . push ( block ) ;
executor . finish ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
{
std : : lock_guard lock ( mutex ) ;
flushed_up_to = to_flush_end ;
is_force_prepare_tables = false ;
flush_event . notify_all ( ) ;
}
LOG_TRACE ( log , " Flushed system log up to offset {} " , to_flush_end ) ;
}
template < typename LogElement >
void SystemLog < LogElement > : : prepareTable ( )
{
String description = table_id . getNameForLogs ( ) ;
auto table = DatabaseCatalog : : instance ( ) . tryGetTable ( table_id , getContext ( ) ) ;
if ( table )
{
if ( old_create_query . empty ( ) )
{
old_create_query = serializeAST ( * getCreateTableQueryClean ( table_id , getContext ( ) ) ) ;
if ( old_create_query . empty ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Empty CREATE QUERY for {} " , backQuoteIfNeed ( table_id . table_name ) ) ;
}
if ( old_create_query ! = create_query )
{
/// Rename the existing table.
int suffix = 0 ;
while ( DatabaseCatalog : : instance ( ) . isTableExist (
{ table_id . database_name , table_id . table_name + " _ " + toString ( suffix ) } , getContext ( ) ) )
+ + suffix ;
auto rename = std : : make_shared < ASTRenameQuery > ( ) ;
2023-02-14 13:01:06 +00:00
ASTRenameQuery : : Element elem
{
ASTRenameQuery : : Table
{
table_id . database_name . empty ( ) ? nullptr : std : : make_shared < ASTIdentifier > ( table_id . database_name ) ,
std : : make_shared < ASTIdentifier > ( table_id . table_name )
} ,
ASTRenameQuery : : Table
{
table_id . database_name . empty ( ) ? nullptr : std : : make_shared < ASTIdentifier > ( table_id . database_name ) ,
std : : make_shared < ASTIdentifier > ( table_id . table_name + " _ " + toString ( suffix ) )
}
} ;
2022-01-10 19:01:41 +00:00
LOG_DEBUG (
log ,
" Existing table {} for system log has obsolete or different structure. Renaming it to {}. \n Old: {} \n New: {} \n . " ,
description ,
2023-02-14 13:01:06 +00:00
backQuoteIfNeed ( elem . to . getTable ( ) ) ,
2022-01-10 19:01:41 +00:00
old_create_query ,
create_query ) ;
2023-02-14 13:01:06 +00:00
rename - > elements . emplace_back ( std : : move ( elem ) ) ;
2022-01-10 19:01:41 +00:00
auto query_context = Context : : createCopy ( context ) ;
query_context - > makeQueryContext ( ) ;
InterpreterRenameQuery ( rename , query_context ) . execute ( ) ;
/// The required table will be created.
table = nullptr ;
}
else if ( ! is_prepared )
LOG_DEBUG ( log , " Will use existing table {} for {} " , description , LogElement : : name ( ) ) ;
}
if ( ! table )
{
/// Create the table.
LOG_DEBUG ( log , " Creating new table {} for {} " , description , LogElement : : name ( ) ) ;
auto query_context = Context : : createCopy ( context ) ;
query_context - > makeQueryContext ( ) ;
auto create_query_ast = getCreateTableQuery ( ) ;
InterpreterCreateQuery interpreter ( create_query_ast , query_context ) ;
interpreter . setInternal ( true ) ;
interpreter . execute ( ) ;
table = DatabaseCatalog : : instance ( ) . getTable ( table_id , getContext ( ) ) ;
old_create_query . clear ( ) ;
}
is_prepared = true ;
}
template < typename LogElement >
ASTPtr SystemLog < LogElement > : : getCreateTableQuery ( )
{
auto create = std : : make_shared < ASTCreateQuery > ( ) ;
create - > setDatabase ( table_id . database_name ) ;
create - > setTable ( table_id . table_name ) ;
auto new_columns_list = std : : make_shared < ASTColumns > ( ) ;
2022-04-17 21:49:39 +00:00
if ( const char * custom_column_list = LogElement : : getCustomColumnList ( ) )
{
ParserColumnDeclarationList parser ;
2022-04-17 21:51:56 +00:00
const Settings & settings = getContext ( ) - > getSettingsRef ( ) ;
2022-04-17 21:49:39 +00:00
ASTPtr columns_list_raw = parseQuery ( parser , custom_column_list , " columns declaration list " , settings . max_query_size , settings . max_parser_depth ) ;
new_columns_list - > set ( new_columns_list - > columns , columns_list_raw ) ;
}
else
{
auto ordinary_columns = LogElement : : getNamesAndTypes ( ) ;
auto alias_columns = LogElement : : getNamesAndAliases ( ) ;
new_columns_list - > set ( new_columns_list - > columns , InterpreterCreateQuery : : formatColumns ( ordinary_columns , alias_columns ) ) ;
}
2022-01-10 19:01:41 +00:00
create - > set ( create - > columns_list , new_columns_list ) ;
2022-03-22 21:44:46 +00:00
ParserStorageWithComment storage_parser ;
ASTPtr storage_with_comment_ast = parseQuery (
2022-01-10 19:01:41 +00:00
storage_parser , storage_def . data ( ) , storage_def . data ( ) + storage_def . size ( ) ,
" Storage to create table for " + LogElement : : name ( ) , 0 , DBMS_DEFAULT_MAX_PARSER_DEPTH ) ;
2022-03-22 21:44:46 +00:00
StorageWithComment & storage_with_comment = storage_with_comment_ast - > as < StorageWithComment & > ( ) ;
create - > set ( create - > storage , storage_with_comment . storage ) ;
create - > set ( create - > comment , storage_with_comment . comment ) ;
2022-01-10 19:01:41 +00:00
2022-02-28 15:25:54 +00:00
/// Write additional (default) settings for MergeTree engine to make it make it possible to compare ASTs
/// and recreate tables on settings changes.
2022-03-02 10:21:53 +00:00
const auto & engine = create - > storage - > engine - > as < ASTFunction & > ( ) ;
if ( endsWith ( engine . name , " MergeTree " ) )
{
auto storage_settings = std : : make_unique < MergeTreeSettings > ( getContext ( ) - > getMergeTreeSettings ( ) ) ;
2023-02-04 18:31:19 +00:00
storage_settings - > loadFromQuery ( * create - > storage , getContext ( ) ) ;
2022-03-02 10:21:53 +00:00
}
2022-01-10 19:01:41 +00:00
return create ;
}
2022-01-28 18:18:36 +00:00
2022-01-25 09:58:11 +00:00
# define INSTANTIATE_SYSTEM_LOG(ELEMENT) template class SystemLog<ELEMENT>;
SYSTEM_LOG_ELEMENTS ( INSTANTIATE_SYSTEM_LOG )
2022-01-10 19:01:41 +00:00
2017-06-05 13:59:38 +00:00
}