ClickHouse/dbms/src/Interpreters/loadMetadata.cpp

226 lines
7.0 KiB
C++
Raw Normal View History

#include <iomanip>
#include <thread>
#include <future>
#include <statdaemons/threadpool.hpp>
2011-10-30 11:30:52 +00:00
#include <Poco/DirectoryIterator.h>
#include <Poco/FileStream.h>
#include <DB/Parsers/ParserCreateQuery.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/parseQuery.h>
2011-10-30 11:30:52 +00:00
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/loadMetadata.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/copyData.h>
2015-09-24 19:33:16 +00:00
#include <DB/Common/escapeForFileName.h>
#include <statdaemons/Stopwatch.h>
2011-10-30 11:30:52 +00:00
namespace DB
{
static void executeCreateQuery(const String & query, Context & context, const String & database, const String & file_name)
{
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "in file " + file_name);
2011-10-30 11:30:52 +00:00
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
2011-10-30 11:30:52 +00:00
ast_create_query.attach = true;
ast_create_query.database = database;
2012-03-09 03:06:09 +00:00
2015-07-14 14:49:49 +00:00
try
{
InterpreterCreateQuery(ast, context).executeLoadExisting();
}
catch (const Exception & e)
{
/// Исправление для ChunkMerger.
if (e.code() == ErrorCodes::TABLE_ALREADY_EXISTS)
{
2015-07-14 17:11:09 +00:00
if (const auto id = dynamic_cast<const ASTFunction *>(ast_create_query.storage.get()))
2015-07-14 14:49:49 +00:00
{
if (id->name == "TinyLog" || id->name == "StripeLog")
2015-07-14 14:49:49 +00:00
{
tryLogCurrentException(__PRETTY_FUNCTION__);
return;
}
}
}
throw;
}
2011-10-30 11:30:52 +00:00
}
2011-10-30 11:30:52 +00:00
void loadMetadata(Context & context)
{
Logger * log = &Logger::get("loadMetadata");
2011-10-30 11:30:52 +00:00
/// Здесь хранятся определения таблиц
2012-08-02 17:33:31 +00:00
String path = context.getPath() + "metadata";
2011-10-30 11:30:52 +00:00
struct Table
{
String database_name;
String dir_name;
String file_name;
};
using Tables = std::vector<Table>;
Tables tables;
2011-10-30 11:30:52 +00:00
/// Цикл по базам данных
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
{
if (!it->isDirectory())
continue;
2012-03-09 03:56:12 +00:00
/// Для директории .svn
if (it.name().at(0) == '.')
continue;
2015-09-24 19:33:16 +00:00
String database = unescapeForFileName(it.name());
2013-06-15 04:16:14 +00:00
LOG_INFO(log, "Looking for tables in database " << database);
2015-09-24 19:33:16 +00:00
executeCreateQuery("ATTACH DATABASE " + backQuoteIfNeed(database), context, database, it->path());
2011-11-05 23:31:19 +00:00
2011-10-30 11:30:52 +00:00
/// Цикл по таблицам
typedef std::vector<std::string> FileNames;
FileNames file_names;
2011-10-30 11:30:52 +00:00
for (Poco::DirectoryIterator jt(it->path()); jt != dir_end; ++jt)
{
/// Для директории .svn
if (jt.name().at(0) == '.')
continue;
2011-10-30 11:30:52 +00:00
/// Файлы имеют имена вида table_name.sql
if (jt.name().compare(jt.name().size() - 4, 4, ".sql"))
throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME);
file_names.push_back(jt.name());
}
/** Таблицы быстрее грузятся, если их грузить в сортированном (по именам) порядке.
* Иначе (для файловой системы ext4) DirectoryIterator перебирает их в некотором порядке,
* который не соответствует порядку создания таблиц и не соответствует порядку их расположения на диске.
*/
std::sort(file_names.begin(), file_names.end());
for (const auto & name : file_names)
tables.emplace_back(Table{
.database_name = database,
.dir_name = it.name(),
.file_name = name});
LOG_INFO(log, "Found " << file_names.size() << " tables.");
}
size_t total_tables = tables.size();
LOG_INFO(log, "Total " << total_tables << " tables.");
StopwatchWithLock watch;
size_t tables_processed = 0;
static constexpr size_t MIN_TABLES_TO_PARALLEL_LOAD = 1;
static constexpr size_t PRINT_MESSAGE_EACH_N_TABLES = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
static constexpr size_t TABLES_PARALLEL_LOAD_BUNCH_SIZE = 100;
size_t num_threads = std::min(total_tables, SettingMaxThreads().getAutoValue());
std::unique_ptr<boost::threadpool::pool> thread_pool;
if (total_tables > MIN_TABLES_TO_PARALLEL_LOAD && num_threads > 1)
thread_pool.reset(new boost::threadpool::pool(num_threads));
size_t bunch_size = TABLES_PARALLEL_LOAD_BUNCH_SIZE;
if (total_tables < bunch_size * num_threads)
bunch_size = total_tables / num_threads;
auto task_function = [&](Tables::const_iterator begin, Tables::const_iterator end)
{
for (Tables::const_iterator it = begin; it != end; ++it)
{
const Table & table = *it;
const String path_to_metadata = path + "/" + table.dir_name + "/" + table.file_name;
/// Сообщения, чтобы было не скучно ждать, когда сервер долго загружается.
if (__sync_add_and_fetch(&tables_processed, 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|| watch.lockTestAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
watch.restart();
}
String s;
{
char in_buf[METADATA_FILE_BUFFER_SIZE];
ReadBufferFromFile in(path_to_metadata, METADATA_FILE_BUFFER_SIZE, -1, in_buf);
WriteBufferFromString out(s);
copyData(in, out);
}
2011-10-30 11:30:52 +00:00
/** Пустые файлы с метаданными образуются после грубого перезапуска сервера.
* Удаляем эти файлы, чтобы чуть-чуть уменьшить работу админов по запуску.
*/
if (s.empty())
{
LOG_ERROR(log, "File " << path_to_metadata << " is empty. Removing.");
Poco::File(path_to_metadata).remove();
continue;
}
2012-09-27 18:27:05 +00:00
try
{
executeCreateQuery(s, context, table.database_name, path_to_metadata);
2012-09-27 18:27:05 +00:00
}
catch (const Exception & e)
2012-09-27 18:27:05 +00:00
{
throw Exception("Cannot create table from metadata file " + path_to_metadata + ", error: " + e.displayText() +
2014-04-03 12:49:01 +00:00
", stack trace:\n" + e.getStackTrace().toString(),
2012-09-27 18:27:05 +00:00
ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
}
2011-10-30 11:30:52 +00:00
}
};
/** packaged_task используются, чтобы исключения автоматически прокидывались в основной поток.
* Недостаток - исключения попадают в основной поток только после окончания работы всех task-ов.
*/
size_t num_bunches = (total_tables + bunch_size - 1) / bunch_size;
std::vector<std::packaged_task<void()>> tasks(num_bunches);
for (size_t i = 0; i < num_bunches; ++i)
{
auto begin = tables.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches)
? tables.end()
: (tables.begin() + (i + 1) * bunch_size);
tasks[i] = std::packaged_task<void()>(std::bind(task_function, begin, end));
if (thread_pool)
thread_pool->schedule([i, &tasks]{ tasks[i](); });
else
tasks[i]();
2011-10-30 11:30:52 +00:00
}
if (thread_pool)
thread_pool->wait();
for (auto & task : tasks)
task.get_future().get();
2011-10-30 11:30:52 +00:00
}
}