Merge pull request #5685 from CurtizJ/upstream/systemlog-flushing

Fix race condition in flushing system log in another way.
This commit is contained in:
alexey-milovidov 2019-06-29 15:46:22 +03:00 committed by GitHub
commit fe542e26f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 108 additions and 44 deletions

View File

@ -244,15 +244,12 @@ struct ContextShared
return;
shutdown_called = true;
{
std::lock_guard lock(mutex);
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log.
*/
/** After this point, system logs will shutdown their threads and no longer write any data.
* It will prevent recreation of system tables at shutdown.
* Note that part changes at shutdown won't be logged to part log.
*/
system_logs.reset();
}
if (system_logs)
system_logs->shutdown();
/** At this point, some tables may have threads that block our mutex.
* To shutdown them correctly, we will copy the current list of tables,

View File

@ -50,6 +50,12 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
SystemLogs::~SystemLogs()
{
shutdown();
}
void SystemLogs::shutdown()
{
if (query_log)
query_log->shutdown();

View File

@ -2,6 +2,7 @@
#include <thread>
#include <atomic>
#include <condition_variable>
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
#include <Core/Types.h>
@ -67,6 +68,8 @@ struct SystemLogs
SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs();
void shutdown();
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
@ -101,22 +104,10 @@ public:
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void add(const LogElement & element)
{
if (is_shutdown)
return;
/// Without try we could block here in case of queue overflow.
if (!queue.tryPush({false, element}))
LOG_ERROR(log, "SystemLog queue is full");
}
void add(const LogElement & element);
/// Flush data in the buffer to disk
void flush()
{
if (!is_shutdown)
flushImpl(false);
}
void flush();
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown();
@ -130,7 +121,15 @@ protected:
const size_t flush_interval_milliseconds;
std::atomic<bool> is_shutdown{false};
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
enum class EntryType
{
LOG_ELEMENT = 0,
AUTO_FLUSH,
FORCE_FLUSH,
SHUTDOWN,
};
using QueueItem = std::pair<EntryType, LogElement>;
/// Queue is bounded. But its size is quite large to not block in all normal cases.
ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE};
@ -140,7 +139,6 @@ protected:
* than accumulation of large amount of log records (for example, for query log - processing of large amount of queries).
*/
std::vector<LogElement> data;
std::mutex data_mutex;
Logger * log;
@ -157,7 +155,13 @@ protected:
bool is_prepared = false;
void prepareTable();
void flushImpl(bool quiet);
std::mutex flush_mutex;
std::mutex condvar_mutex;
std::condition_variable flush_condvar;
bool force_flushing = false;
/// flushImpl can be executed only in saving_thread.
void flushImpl(EntryType reason);
};
@ -178,6 +182,37 @@ SystemLog<LogElement>::SystemLog(Context & context_,
}
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
if (is_shutdown)
return;
/// Without try we could block here in case of queue overflow.
if (!queue.tryPush({EntryType::LOG_ELEMENT, element}))
LOG_ERROR(log, "SystemLog queue is full");
}
template <typename LogElement>
void SystemLog<LogElement>::flush()
{
if (is_shutdown)
return;
std::lock_guard flush_lock(flush_mutex);
force_flushing = true;
/// Tell thread to execute extra flush.
queue.push({EntryType::FORCE_FLUSH, {}});
/// Wait for flush being finished.
std::unique_lock lock(condvar_mutex);
while (force_flushing)
flush_condvar.wait(lock);
}
template <typename LogElement>
void SystemLog<LogElement>::shutdown()
{
@ -186,7 +221,7 @@ void SystemLog<LogElement>::shutdown()
return;
/// Tell thread to shutdown.
queue.push({true, {}});
queue.push({EntryType::SHUTDOWN, {}});
saving_thread.join();
}
@ -219,16 +254,10 @@ void SystemLog<LogElement>::threadFunction()
QueueItem element;
bool has_element = false;
bool is_empty;
{
std::unique_lock lock(data_mutex);
is_empty = data.empty();
}
/// data.size() is increased only in this function
/// TODO: get rid of data and queue duality
if (is_empty)
if (data.empty())
{
queue.pop(element);
has_element = true;
@ -242,25 +271,27 @@ void SystemLog<LogElement>::threadFunction()
if (has_element)
{
if (element.first)
if (element.first == EntryType::SHUTDOWN)
{
/// Shutdown.
/// NOTE: MergeTree engine can write data even it is already in shutdown state.
flush();
flushImpl(element.first);
break;
}
else
else if (element.first == EntryType::FORCE_FLUSH)
{
std::unique_lock lock(data_mutex);
data.push_back(element.second);
flushImpl(element.first);
time_after_last_write.restart();
continue;
}
else
data.push_back(element.second);
}
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed >= flush_interval_milliseconds)
{
/// Write data to a table.
flushImpl(true);
flushImpl(EntryType::AUTO_FLUSH);
time_after_last_write.restart();
}
}
@ -275,13 +306,11 @@ void SystemLog<LogElement>::threadFunction()
template <typename LogElement>
void SystemLog<LogElement>::flushImpl(bool quiet)
void SystemLog<LogElement>::flushImpl(EntryType reason)
{
std::unique_lock lock(data_mutex);
try
{
if (quiet && data.empty())
if ((reason == EntryType::AUTO_FLUSH || reason == EntryType::SHUTDOWN) && data.empty())
return;
LOG_TRACE(log, "Flushing system log");
@ -320,6 +349,12 @@ void SystemLog<LogElement>::flushImpl(bool quiet)
/// In case of exception, also clean accumulated data - to avoid locking.
data.clear();
}
if (reason == EntryType::FORCE_FLUSH)
{
std::lock_guard lock(condvar_mutex);
force_flushing = false;
flush_condvar.notify_one();
}
}

View File

@ -0,0 +1,9 @@
<?xml version="1.0"?>
<yandex>
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>300</flush_interval_milliseconds>
</query_log>
</yandex>

View File

@ -92,6 +92,23 @@ def test_RELOAD_CONFIG_AND_MACROS(started_cluster):
instance.query("SYSTEM RELOAD CONFIG")
assert TSV(instance.query("select * from system.macros")) == TSV("mac\tro\n")
def test_SYSTEM_FLUSH_LOGS(started_cluster):
instance = cluster.instances['ch1']
for i in range(4):
# Sleep to execute flushing from background thread at first query
# by expiration of flush_interval_millisecond and test probable race condition.
time.sleep(0.5)
result = instance.query('''
SET log_queries = 1;
SELECT 1 FORMAT Null;
SET log_queries = 0;
SYSTEM FLUSH LOGS;
SELECT count() FROM system.query_log;''')
instance.query('TRUNCATE TABLE system.query_log')
assert TSV(result) == TSV('4')
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():