Merge pull request #31824 from azat/system-log-tables-recreate-different-engine

Recreate system.*_log tables in case of different engine/partition_by
This commit is contained in:
Kseniia Sumarokova 2021-11-29 22:56:36 +03:00 committed by GitHub
commit edf1ec776a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 129 additions and 34 deletions

View File

@ -96,6 +96,22 @@ std::shared_ptr<TSystemLog> createSystemLog(
}
ASTPtr ISystemLog::getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
{
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
ASTPtr old_ast = database->getCreateTableQuery(table_id.table_name, context);
auto & old_create_query_ast = old_ast->as<ASTCreateQuery &>();
/// Reset UUID
old_create_query_ast.uuid = UUIDHelpers::Nil;
/// Existing table has default settings (i.e. `index_granularity = 8192`), reset them.
if (ASTStorage * storage = old_create_query_ast.storage)
{
storage->reset(storage->settings);
}
return old_ast;
}
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log");

View File

@ -61,6 +61,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int LOGICAL_ERROR;
}
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
@ -83,13 +84,18 @@ class ISystemLog
{
public:
virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
virtual void shutdown() = 0;
virtual ~ISystemLog() = default;
/// returns CREATE TABLE query, but with removed:
/// - UUID
/// - SETTINGS (for MergeTree)
/// That way it can be used to compare with the SystemLog::getCreateTableQuery()
static ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context);
};
@ -171,7 +177,7 @@ public:
return LogElement::name();
}
ASTPtr getCreateTableQuery() override;
ASTPtr getCreateTableQuery();
protected:
Poco::Logger * log;
@ -181,6 +187,8 @@ private:
const StorageID table_id;
const String storage_def;
StoragePtr table;
String create_query;
String old_create_query;
bool is_prepared = false;
const size_t flush_interval_milliseconds;
ThreadFromGlobalPool saving_thread;
@ -228,6 +236,7 @@ SystemLog<LogElement>::SystemLog(
: WithContext(context_)
, table_id(database_name_, table_name_)
, storage_def(storage_def_)
, create_query(serializeAST(*getCreateTableQuery()))
, flush_interval_milliseconds(flush_interval_milliseconds_)
{
assert(database_name_ == DatabaseCatalog::SYSTEM_DATABASE);
@ -520,14 +529,14 @@ void SystemLog<LogElement>::prepareTable()
if (table)
{
auto metadata_columns = table->getInMemoryMetadataPtr()->getColumns();
auto old_query = InterpreterCreateQuery::formatColumns(metadata_columns);
if (old_create_query.empty())
{
old_create_query = serializeAST(*getCreateTableQueryClean(table_id, getContext()));
if (old_create_query.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty CREATE QUERY for {}", backQuoteIfNeed(table_id.table_name));
}
auto ordinary_columns = LogElement::getNamesAndTypes();
auto alias_columns = LogElement::getNamesAndAliases();
auto current_query = InterpreterCreateQuery::formatColumns(ordinary_columns, alias_columns);
if (serializeAST(*old_query) != serializeAST(*current_query))
if (old_create_query != create_query)
{
/// Rename the existing table.
int suffix = 0;
@ -553,9 +562,11 @@ void SystemLog<LogElement>::prepareTable()
LOG_DEBUG(
log,
"Existing table {} for system log has obsolete or different structure. Renaming it to {}",
"Existing table {} for system log has obsolete or different structure. Renaming it to {}.\nOld: {}\nNew: {}\n.",
description,
backQuoteIfNeed(to.table));
backQuoteIfNeed(to.table),
old_create_query,
create_query);
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
@ -573,17 +584,17 @@ void SystemLog<LogElement>::prepareTable()
/// Create the table.
LOG_DEBUG(log, "Creating new table {} for {}", description, LogElement::name());
auto create = getCreateTableQuery();
auto query_context = Context::createCopy(context);
query_context->makeQueryContext();
InterpreterCreateQuery interpreter(create, query_context);
auto create_query_ast = getCreateTableQuery();
InterpreterCreateQuery interpreter(create_query_ast, query_context);
interpreter.setInternal(true);
interpreter.execute();
table = DatabaseCatalog::instance().getTable(table_id, getContext());
old_create_query.clear();
}
is_prepared = true;

View File

@ -14,7 +14,6 @@ namespace ErrorCodes
extern const int TOO_DEEP_AST;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int LOGICAL_ERROR;
}
@ -48,23 +47,6 @@ size_t IAST::checkSize(size_t max_size) const
return res;
}
void IAST::reset(IAST *& field)
{
if (field == nullptr)
return;
const auto child = std::find_if(children.begin(), children.end(), [field](const auto & p)
{
return p.get() == field;
});
if (child == children.end())
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
children.erase(child);
field = nullptr;
}
IAST::Hash IAST::getTreeHash() const
{

View File

@ -157,7 +157,23 @@ public:
set(field, child);
}
void reset(IAST *& field);
template <typename T>
void reset(T * & field)
{
if (field == nullptr)
return;
const auto child = std::find_if(children.begin(), children.end(), [field](const auto & p)
{
return p.get() == field;
});
if (child == children.end())
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
children.erase(child);
field = nullptr;
}
/// Convert to a string.

View File

@ -0,0 +1,70 @@
# pylint: disable=line-too-long
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node_default', stay_alive=True)
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_system_logs_recreate():
system_logs = [
# enabled by default
'query_log',
'query_thread_log',
'part_log',
'trace_log',
'metric_log',
]
node.query('SYSTEM FLUSH LOGS')
for table in system_logs:
assert 'ENGINE = MergeTree' in node.query(f'SHOW CREATE TABLE system.{table}')
assert 'ENGINE = Null' not in node.query(f'SHOW CREATE TABLE system.{table}')
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 1
# NOTE: we use zzz- prefix to make it the last file,
# so that it will be applied last.
for table in system_logs:
node.exec_in_container(['bash', '-c', f"""echo "
<clickhouse>
<{table}>
<engine>ENGINE = Null</engine>
<partition_by remove='remove'/>
</{table}>
</clickhouse>
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
"""])
node.restart_clickhouse()
node.query('SYSTEM FLUSH LOGS')
for table in system_logs:
assert 'ENGINE = MergeTree' not in node.query(f'SHOW CREATE TABLE system.{table}')
assert 'ENGINE = Null' in node.query(f'SHOW CREATE TABLE system.{table}')
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 2
for table in system_logs:
node.exec_in_container(['rm', f'/etc/clickhouse-server/config.d/zzz-override-{table}.xml'])
node.restart_clickhouse()
node.query('SYSTEM FLUSH LOGS')
for table in system_logs:
assert 'ENGINE = MergeTree' in node.query(f'SHOW CREATE TABLE system.{table}')
assert 'ENGINE = Null' not in node.query(f'SHOW CREATE TABLE system.{table}')
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 3
node.query('SYSTEM FLUSH LOGS')
# Ensure that there was no superfluous RENAME's
# IOW that the table created only when the structure is indeed different.
for table in system_logs:
assert len(node.query(f"SHOW TABLES FROM system LIKE '{table}%'").strip().split('\n')) == 3