Merge remote-tracking branch 'upstream/master' into METR-23466

This commit is contained in:
proller 2016-11-24 00:04:38 +03:00
commit c919c02ed4
5 changed files with 98 additions and 82 deletions

View File

@ -201,7 +201,7 @@ public:
Databases getDatabases();
/// Для методов ниже может быть необходимо захватывать блокировку самостоятельно.
/// For methods below you may need to acquire a lock by yourself.
std::unique_lock<Poco::Mutex> getLock() const;
const Context & getSessionContext() const;

View File

@ -88,6 +88,7 @@ private:
const String database_name;
const String table_name;
StoragePtr table;
const String engine;
const size_t flush_interval_milliseconds;
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
@ -109,6 +110,13 @@ private:
void threadFunction();
void flush();
/** Creates new table if it does not exist.
* Renames old table if its structure is not suitable.
* This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
*/
bool is_prepared = false;
void prepareTable();
};
@ -116,84 +124,15 @@ template <typename LogElement>
SystemLog<LogElement>::SystemLog(Context & context_,
const String & database_name_,
const String & table_name_,
const String & engine,
const String & engine_,
size_t flush_interval_milliseconds_)
: context(context_), database_name(database_name_), table_name(table_name_), flush_interval_milliseconds(flush_interval_milliseconds_)
: context(context_),
database_name(database_name_), table_name(table_name_), engine(engine_),
flush_interval_milliseconds(flush_interval_milliseconds_)
{
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE);
{
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
auto lock = context.getLock();
table = context.tryGetTable(database_name, table_name);
if (table)
{
const Block expected = LogElement::createBlock();
const Block actual = table->getSampleBlockNonMaterialized();
if (!blocksHaveEqualStructure(actual, expected))
{
/// Переименовываем существующую таблицу.
int suffix = 0;
while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = database_name;
from.table = table_name;
ASTRenameQuery::Table to;
to.database = database_name;
to.table = table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
" Renaming it to " << backQuoteIfNeed(to.table));
InterpreterRenameQuery(rename, context).execute();
/// Нужная таблица будет создана.
table = nullptr;
}
else
LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name());
}
if (!table)
{
/// Создаём таблицу.
LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());
auto create = std::make_shared<ASTCreateQuery>();
create->database = database_name;
create->table = table_name;
Block sample = LogElement::createBlock();
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
ParserFunction engine_parser;
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name());
InterpreterCreateQuery(create, context).execute();
table = context.getTable(database_name, table_name);
}
}
saving_thread = std::thread([this] { threadFunction(); });
}
@ -277,6 +216,9 @@ void SystemLog<LogElement>::flush()
{
LOG_TRACE(log, "Flushing query log");
if (!is_prepared) /// BTW, flush method is called from single thread.
prepareTable();
Block block = LogElement::createBlock();
for (const LogElement & elem : data)
elem.appendToBlock(block);
@ -306,4 +248,79 @@ void SystemLog<LogElement>::flush()
}
template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);
auto lock = context.getLock();
table = context.tryGetTable(database_name, table_name);
if (table)
{
const Block expected = LogElement::createBlock();
const Block actual = table->getSampleBlockNonMaterialized();
if (!blocksHaveEqualStructure(actual, expected))
{
/// Переименовываем существующую таблицу.
int suffix = 0;
while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
++suffix;
auto rename = std::make_shared<ASTRenameQuery>();
ASTRenameQuery::Table from;
from.database = database_name;
from.table = table_name;
ASTRenameQuery::Table to;
to.database = database_name;
to.table = table_name + "_" + toString(suffix);
ASTRenameQuery::Element elem;
elem.from = from;
elem.to = to;
rename->elements.emplace_back(elem);
LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
" Renaming it to " << backQuoteIfNeed(to.table));
InterpreterRenameQuery(rename, context).execute();
/// Нужная таблица будет создана.
table = nullptr;
}
else
LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name());
}
if (!table)
{
/// Создаём таблицу.
LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());
auto create = std::make_shared<ASTCreateQuery>();
create->database = database_name;
create->table = table_name;
Block sample = LogElement::createBlock();
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
ParserFunction engine_parser;
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name());
InterpreterCreateQuery(create, context).execute();
table = context.getTable(database_name, table_name);
}
is_prepared = true;
}
}

View File

@ -102,7 +102,7 @@ public:
* Это возьмет дополнительный лок, не позволяющий начать ALTER MODIFY.
*
* WARNING: Вызывать методы из ITableDeclaration нужно под такой блокировкой. Без нее они не thread safe.
* WARNING: Чтобы не было дедлоков, нельзя вызывать это метод при захваченном мьютексе в Context.
* WARNING: To avoid deadlocks, this method must not be called under lock of Context.
*/
TableStructureReadLockPtr lockStructure(bool will_modify_data)
{

View File

@ -62,18 +62,18 @@ namespace ErrorCodes
class TableFunctionFactory;
/** Набор известных объектов, которые могут быть использованы в запросе.
* Разделяемая часть. Порядок членов (порядок их уничтожения) очень важен.
/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
*/
struct ContextShared
{
Logger * log = &Logger::get("Context"); /// Логгер.
Logger * log = &Logger::get("Context");
/// Для доступа и модификации разделяемых объектов. Рекурсивный mutex.
/// For access of most of shared objects. Recursive mutex.
mutable Poco::Mutex mutex;
/// Для доступа к внешним словарям. Отдельный мьютекс, чтобы избежать локов при обращении сервера к самому себе.
/// Separate mutex for access of external dictionaries. Separate mutex to avoid locks when server doing request to itself.
mutable std::mutex external_dictionaries_mutex;
/// Отдельный mutex для переинициализации zookeeper-а. Эта операция может заблокироваться на существенное время и не должна мешать остальным.
/// Separate mutex for re-initialization of zookeer session. This operation could take a long time and must not interfere with another operations.
mutable std::mutex zookeeper_mutex;
mutable zkutil::ZooKeeperPtr zookeeper; /// Клиент для ZooKeeper.

1
debian/daemons vendored
View File

@ -1,4 +1,3 @@
compressor
clickhouse-client
clickhouse-server
config-processor