Force table creation on SYSTEM FLUSH LOGS

This commit is contained in:
Azat Khuzhin 2020-06-09 21:11:08 +03:00
parent 42acb627fb
commit 8a78cffe5c
3 changed files with 55 additions and 9 deletions

View File

@ -301,12 +301,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::FLUSH_LOGS:
context.checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); }
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(true); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(true); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(true); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(true); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(true); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(true); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -76,7 +76,8 @@ class ISystemLog
public:
virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0;
virtual void flush() = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
virtual void shutdown() = 0;
@ -133,7 +134,7 @@ public:
void stopFlushThread();
/// Flush data in the buffer to disk
void flush() override;
void flush(bool force = false) override;
/// Start the background thread.
void startup() override;
@ -166,6 +167,8 @@ private:
/* Data shared between callers of add()/flush()/shutdown(), and the saving thread */
std::mutex mutex;
/* prepareTable() guard */
std::mutex prepare_mutex;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
@ -272,13 +275,16 @@ void SystemLog<LogElement>::add(const LogElement & element)
template <typename LogElement>
void SystemLog<LogElement>::flush()
void SystemLog<LogElement>::flush(bool force)
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (force)
prepareTable();
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_before < queue_end)
@ -429,6 +435,8 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
std::unique_lock prepare_lock(prepare_mutex);
String description = table_id.getNameForLogs();
table = DatabaseCatalog::instance().tryGetTable(table_id, context);

View File

@ -0,0 +1,38 @@
# pylint: disable=line-too-long
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node_default')
system_logs = [
# disabled by default
('system.part_log', 0),
('system.text_log', 0),
# enabled by default
('system.query_log', 1),
('system.query_thread_log', 1),
('system.trace_log', 1),
('system.metric_log', 1),
]
@pytest.fixture(scope='module')
def start_cluster():
try:
cluster.start()
node.query('SYSTEM FLUSH LOGS')
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize('table,exists', system_logs)
def test_system_logs(start_cluster, table, exists):
q = 'SELECT * FROM {}'.format(table)
if exists:
node.query(q)
else:
assert "Table {} doesn't exist".format(table) in node.query_and_get_error(q)