Revert "Force table creation on SYSTEM FLUSH LOGS"

This commit is contained in:
alesapin 2020-06-11 13:22:24 +03:00 committed by GitHub
parent 1777bd0273
commit 5a9a63e9a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 59 deletions

View File

@ -301,12 +301,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::FLUSH_LOGS:
context.checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
executeCommandsAndThrowIfError(
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(true); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(true); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(true); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(true); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(true); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(true); }
[&] () { if (auto query_log = context.getQueryLog()) query_log->flush(); },
[&] () { if (auto part_log = context.getPartLog("")) part_log->flush(); },
[&] () { if (auto query_thread_log = context.getQueryThreadLog()) query_thread_log->flush(); },
[&] () { if (auto trace_log = context.getTraceLog()) trace_log->flush(); },
[&] () { if (auto text_log = context.getTextLog()) text_log->flush(); },
[&] () { if (auto metric_log = context.getMetricLog()) metric_log->flush(); }
);
break;
case Type::STOP_LISTEN_QUERIES:

View File

@ -76,8 +76,7 @@ class ISystemLog
public:
virtual String getName() = 0;
virtual ASTPtr getCreateTableQuery() = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0;
virtual void flush() = 0;
virtual void prepareTable() = 0;
virtual void startup() = 0;
virtual void shutdown() = 0;
@ -134,7 +133,7 @@ public:
void stopFlushThread();
/// Flush data in the buffer to disk
void flush(bool force = false) override;
void flush() override;
/// Start the background thread.
void startup() override;
@ -167,8 +166,6 @@ private:
/* Data shared between callers of add()/flush()/shutdown(), and the saving thread */
std::mutex mutex;
/* prepareTable() guard */
std::mutex prepare_mutex;
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
@ -217,7 +214,7 @@ SystemLog<LogElement>::SystemLog(Context & context_,
template <typename LogElement>
void SystemLog<LogElement>::startup()
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
saving_thread = ThreadFromGlobalPool([this] { savingThreadFunction(); });
}
@ -231,7 +228,7 @@ void SystemLog<LogElement>::add(const LogElement & element)
/// Otherwise the tests like 01017_uniqCombined_memory_usage.sql will be flacky.
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
if (is_shutdown)
return;
@ -275,16 +272,13 @@ void SystemLog<LogElement>::add(const LogElement & element)
template <typename LogElement>
void SystemLog<LogElement>::flush(bool force)
void SystemLog<LogElement>::flush()
{
std::unique_lock lock(mutex);
if (is_shutdown)
return;
if (force)
prepareTable();
const uint64_t queue_end = queue_front_index + queue.size();
if (requested_flush_before < queue_end)
@ -310,7 +304,7 @@ template <typename LogElement>
void SystemLog<LogElement>::stopFlushThread()
{
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
if (!saving_thread.joinable())
{
@ -423,7 +417,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
}
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
flushed_before = to_flush_end;
flush_event.notify_all();
}
@ -435,8 +429,6 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
std::lock_guard prepare_lock(prepare_mutex);
String description = table_id.getNameForLogs();
table = DatabaseCatalog::instance().tryGetTable(table_id, context);

View File

@ -1,38 +0,0 @@
# pylint: disable=line-too-long
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node_default')
system_logs = [
# disabled by default
('system.part_log', 0),
('system.text_log', 0),
# enabled by default
('system.query_log', 1),
('system.query_thread_log', 1),
('system.trace_log', 1),
('system.metric_log', 1),
]
@pytest.fixture(scope='module')
def start_cluster():
try:
cluster.start()
node.query('SYSTEM FLUSH LOGS')
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize('table,exists', system_logs)
def test_system_logs(start_cluster, table, exists):
q = 'SELECT * FROM {}'.format(table)
if exists:
node.query(q)
else:
assert "Table {} doesn't exist".format(table) in node.query_and_get_error(q)