ClickHouse/dbms/src/Interpreters/SystemLog.h

438 lines
13 KiB
C++
Raw Normal View History

2016-10-27 17:48:12 +00:00
#pragma once
#include <thread>
2019-03-21 19:22:38 +00:00
#include <atomic>
#include <condition_variable>
2016-10-27 17:48:12 +00:00
#include <boost/noncopyable.hpp>
#include <common/logger_useful.h>
#include <Core/Types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Storages/IStorage.h>
#include <Interpreters/Context.h>
#include <Common/Stopwatch.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Common/setThreadName.h>
#include <Common/ThreadPool.h>
2018-06-05 19:46:49 +00:00
#include <IO/WriteHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
2016-10-27 17:48:12 +00:00
namespace DB
{
/** Allow to store structured log in system table.
*
* Logging is asynchronous. Data is put into queue from where it will be read by separate thread.
* That thread inserts log into a table with no more than specified periodicity.
*/
/** Structure of log, template parameter.
* Structure could change on server version update.
* If on first write, existing table has different structure,
* then it get renamed (put aside) and new table is created.
*/
/* Example:
struct LogElement
{
/// default constructor must be available
/// fields
2016-10-27 17:48:12 +00:00
static std::string name();
static Block createBlock();
void appendToBlock(Block & block) const;
};
*/
2016-10-27 17:48:12 +00:00
#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1048576
2016-10-27 17:48:12 +00:00
class Context;
class QueryLog;
class QueryThreadLog;
class PartLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
{
SystemLogs(Context & global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs();
2019-03-21 19:22:38 +00:00
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
String part_log_database;
};
2016-10-27 17:48:12 +00:00
template <typename LogElement>
class SystemLog : private boost::noncopyable
{
public:
using Self = SystemLog;
/** Parameter: table name where to write log.
* If table is not exists, then it get created with specified engine.
* If it already exists, then its structure is checked to be compatible with structure of log record.
* If it is compatible, then existing table will be used.
* If not - then existing table will be renamed to same name but with suffix '_N' at end,
* where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet;
* and new table get created - as if previous table was not exist.
*/
SystemLog(
Context & context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_);
~SystemLog();
/** Append a record into log.
* Writing to table will be done asynchronously and in case of failure, record could be lost.
*/
void add(const LogElement & element);
2016-10-27 17:48:12 +00:00
/// Flush data in the buffer to disk
void flush();
2019-03-21 19:22:38 +00:00
/// Stop the background flush thread before destructor. No more data will be written.
void shutdown();
protected:
Context & context;
const String database_name;
const String table_name;
const String storage_def;
StoragePtr table;
const size_t flush_interval_milliseconds;
2019-03-21 19:22:38 +00:00
std::atomic<bool> is_shutdown{false};
enum class ElementType
{
REGULAR = 0,
SHUTDOWN,
FORCE_FLUSH
};
using QueueItem = std::pair<ElementType, LogElement>;
/// Queue is bounded. But its size is quite large to not block in all normal cases.
ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE};
/** Data that was pulled from queue. Data is accumulated here before enough time passed.
* It's possible to implement double-buffering, but we assume that insertion into table is faster
* than accumulation of large amount of log records (for example, for query log - processing of large amount of queries).
*/
std::vector<LogElement> data;
Logger * log;
/** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table.
*/
ThreadFromGlobalPool saving_thread;
void threadFunction();
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/
bool is_prepared = false;
void prepareTable();
2019-03-21 19:22:38 +00:00
std::mutex flush_mutex;
std::mutex condvar_mutex;
std::condition_variable flush_condvar;
bool force_flushing = false;
/// flushImpl can be executed only in saving_thread.
2019-03-21 19:22:38 +00:00
void flushImpl(bool quiet);
2016-10-27 17:48:12 +00:00
};
template <typename LogElement>
SystemLog<LogElement>::SystemLog(Context & context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: context(context_),
database_name(database_name_), table_name(table_name_), storage_def(storage_def_),
flush_interval_milliseconds(flush_interval_milliseconds_)
2016-10-27 17:48:12 +00:00
{
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
2016-10-27 17:48:12 +00:00
data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE);
saving_thread = ThreadFromGlobalPool([this] { threadFunction(); });
2016-10-27 17:48:12 +00:00
}
template <typename LogElement>
void SystemLog<LogElement>::add(const LogElement & element)
{
if (is_shutdown)
return;
/// Without try we could block here in case of queue overflow.
if (!queue.tryPush({ElementType::REGULAR, element}))
LOG_ERROR(log, "SystemLog queue is full");
}
template <typename LogElement>
void SystemLog<LogElement>::flush()
{
if (is_shutdown)
return;
std::lock_guard flush_lock(flush_mutex);
force_flushing = true;
/// Tell thread to execute extra flush.
queue.push({ElementType::FORCE_FLUSH, {}});
/// Wait for flush being finished.
std::unique_lock lock(condvar_mutex);
while (force_flushing)
flush_condvar.wait(lock);
}
2016-10-27 17:48:12 +00:00
template <typename LogElement>
2019-03-21 19:22:38 +00:00
void SystemLog<LogElement>::shutdown()
2016-10-27 17:48:12 +00:00
{
2019-03-21 19:22:38 +00:00
bool old_val = false;
if (!is_shutdown.compare_exchange_strong(old_val, true))
return;
/// Tell thread to shutdown.
queue.push({ElementType::SHUTDOWN, {}});
saving_thread.join();
2016-10-27 17:48:12 +00:00
}
2019-03-21 19:22:38 +00:00
template <typename LogElement>
SystemLog<LogElement>::~SystemLog()
{
shutdown();
}
2016-10-27 17:48:12 +00:00
template <typename LogElement>
void SystemLog<LogElement>::threadFunction()
{
setThreadName("SystemLogFlush");
Stopwatch time_after_last_write;
bool first = true;
while (true)
{
try
{
if (first)
{
time_after_last_write.restart();
first = false;
}
QueueItem element;
bool has_element = false;
/// data.size() is increased only in this function
/// TODO: get rid of data and queue duality
if (data.empty())
{
queue.pop(element);
has_element = true;
}
else
{
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed < flush_interval_milliseconds)
has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed);
}
if (has_element)
{
if (element.first == ElementType::SHUTDOWN)
{
/// NOTE: MergeTree engine can write data even it is already in shutdown state.
flushImpl(true);
break;
}
else if (element.first == ElementType::FORCE_FLUSH)
{
flushImpl(false);
time_after_last_write.restart();
continue;
}
else
data.push_back(element.second);
}
size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
if (milliseconds_elapsed >= flush_interval_milliseconds)
{
/// Write data to a table.
2019-03-21 19:22:38 +00:00
flushImpl(true);
time_after_last_write.restart();
}
}
catch (...)
{
/// In case of exception we lost accumulated data - to avoid locking.
data.clear();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
2016-10-27 17:48:12 +00:00
}
template <typename LogElement>
2019-03-21 19:22:38 +00:00
void SystemLog<LogElement>::flushImpl(bool quiet)
2016-10-27 17:48:12 +00:00
{
try
{
if (quiet && data.empty())
return;
2017-04-27 05:58:16 +00:00
LOG_TRACE(log, "Flushing system log");
2018-03-29 20:54:57 +00:00
/// We check for existence of the table and create it as needed at every flush.
/// This is done to allow user to drop the table at any moment (new empty table will be created automatically).
/// BTW, flush method is called from single thread.
prepareTable();
Block block = LogElement::createBlock();
for (const LogElement & elem : data)
elem.appendToBlock(block);
2017-04-27 05:58:16 +00:00
/// Clear queue early, because insertion to the table could lead to generation of more log entrites
/// and pushing them to already full queue will lead to deadlock.
data.clear();
/// We write to table indirectly, using InterpreterInsertQuery.
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = database_name;
insert->table = table_name;
ASTPtr query_ptr(insert.release());
InterpreterInsertQuery interpreter(query_ptr, context);
BlockIO io = interpreter.execute();
io.out->writePrefix();
io.out->write(block);
io.out->writeSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
/// In case of exception, also clean accumulated data - to avoid locking.
data.clear();
}
if (!quiet)
{
std::lock_guard lock(condvar_mutex);
force_flushing = false;
flush_condvar.notify_one();
}
2016-10-27 17:48:12 +00:00
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
table = context.tryGetTable(database_name, table_name);
if (table)
{
const Block expected = LogElement::createBlock();
const Block actual = table->getSampleBlockNonMaterialized();
if (!blocksHaveEqualStructure(actual, expected))
{
2017-06-02 21:37:28 +00:00
/// Rename the existing table.
int suffix = 0;
while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = database_name;
from.table = table_name;
ASTRenameQuery::Table to;
to.database = database_name;
to.table = table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
" Renaming it to " << backQuoteIfNeed(to.table));
InterpreterRenameQuery(rename, context).execute();
2017-06-02 21:37:28 +00:00
/// The required table will be created.
table = nullptr;
}
else if (!is_prepared)
LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name());
}
if (!table)
{
2017-06-02 21:37:28 +00:00
/// Create the table.
LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());
auto create = std::make_shared<ASTCreateQuery>();
create->database = database_name;
create->table = table_name;
Block sample = LogElement::createBlock();
Data Skipping Indices (#4143) * made index parser * added index parsing * some fixes * added index interface and factory * fixed compilation * ptrs * added indexParts * indextypes * index condition * IndexCondition * added indexes in selectexecutor * fix * changed comment * fix * added granularity * comments * fix * fix * added writing indexes * removed indexpart class * fix * added setSkipIndexes * add rw for MergeTreeIndexes * fixes * upd error * fix * fix * reading * test index * fixed nullptr error * fixed * fix * unique names * asts -> exprlist * minmax index * fix * fixed select * fixed merging * fixed mutation * working minmax * removed test index * fixed style * added indexes to checkDataPart * added tests for minmax index * fixed constructor * fix style * fixed includes * fixed setSkipIndexes * added indexes meta to zookeeper * added parsing * removed throw * alter cmds parse * fix * added alter * fix * alters fix * fix alters * fix "after" * fixed alter * alter fix + test * fixes * upd setSkipIndexes * fixed alter bug with drop all indices * fix metadata editing * new test and repl fix * rm test files * fixed repl alter * fix * fix * indices * MTReadStream * upd test for bug * fix * added useful parsers and ast classes * fix * fix comments * replaced columns * fix * fixed parsing * fixed printing * fix err * basic IndicesDescription * go to IndicesDescr * moved indices * go to indicesDescr * fix test minmax_index* * fixed MT alter * fixed bug with replMT indices storing in zk * rename * refactoring * docs ru * docs ru * docs en * refactor * rename tests * fix docs * refactoring * fix * fix * fix * fixed style * unique idx * unique * fix * better minmax calculation * upd * added getBlock * unique_condition * added termForAST * unique * fixed not * uniqueCondition::mayBeTrueOnGranule * fix * fixed bug with double column * is always true * fix * key set * spaces * test * tests * fix * unique * fix * fix * fixed bug with duplicate column * removed unused data * fix * fixes * __bitSwapLastTwo * fix
2019-02-05 14:50:25 +00:00
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, InterpreterCreateQuery::formatColumns(sample.getNamesAndTypesList()));
create->set(create->columns_list, new_columns_list);
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
2018-04-16 15:11:13 +00:00
"Storage to create table for " + LogElement::name(), 0);
create->set(create->storage, storage_ast);
InterpreterCreateQuery interpreter(create, context);
interpreter.setInternal(true);
interpreter.execute();
table = context.getTable(database_name, table_name);
}
is_prepared = true;
}
2016-10-27 17:48:12 +00:00
}