From acfe3d5028350fc5415f1c8dd39ef58b8ca6e41b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 10 May 2017 02:39:37 -0400 Subject: [PATCH] Improved performance of inserting into StorageDistributed with very high number of shards (not tested) [#CLICKHOUSE-2]. --- .../DistributedBlockOutputStream.cpp | 3 +- dbms/src/Storages/StorageDistributed.cpp | 90 ++++++++++++++----- dbms/src/Storages/StorageDistributed.h | 4 + 3 files changed, 73 insertions(+), 24 deletions(-) diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 36359bb8475..99ded795581 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include @@ -142,7 +141,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std:: if (Poco::File(path).createDirectory()) storage.requireDirectoryMonitor(dir_name); - const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin"; + const auto & file_name = toString(storage.file_names_increment.get()) + ".bin"; const auto & block_file_path = path + file_name; /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 9f9f34da749..3b2093be68b 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -38,6 +38,8 @@ #include +#include + namespace DB { @@ -53,29 +55,70 @@ namespace ErrorCodes namespace { - /// select query has database and table names as AST pointers - /// Creates a copy of query, changes database and table names. - inline ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table) + +/// select query has database and table names as AST pointers +/// Creates a copy of query, changes database and table names. +ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table) +{ + auto modified_query_ast = query->clone(); + typeid_cast(*modified_query_ast).replaceDatabaseAndTable(database, table); + return modified_query_ast; +} + +/// insert query has database and table names as bare strings +/// Creates a copy of query, changes the database and table names. +ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table) +{ + auto modified_query_ast = query->clone(); + + auto & actual_query = typeid_cast(*modified_query_ast); + actual_query.database = database; + actual_query.table = table; + /// make sure query is not INSERT SELECT + actual_query.select = nullptr; + + return modified_query_ast; +} + +/// Calculate maximum number in file names in directory and all subdirectories. +/// To ensure global order of data blocks yet to be sent across server restarts. +UInt64 getMaximumFileNumber(const std::string & path) +{ + UInt64 res = 0; + + boost::filesystem::recursive_directory_iterator begin(path); + boost::filesystem::recursive_directory_iterator end; + for (auto it = begin; it != end; ++it) { - auto modified_query_ast = query->clone(); - typeid_cast(*modified_query_ast).replaceDatabaseAndTable(database, table); - return modified_query_ast; + const auto & path = it->path(); + + if (it->status().type() != boost::filesystem::regular_file || !endsWith(path.filename().string(), ".bin")) + continue; + + UInt64 num = 0; + try + { + num = parse(path.filename().stem().string()); + } + catch (Exception & e) + { + e.addMessage("Unexpected file name " + path.filename().string() + " found at " + path.parent_path().string() + ", should have numeric base name."); + throw; + } + + if (num > res) + res = num; } - /// insert query has database and table names as bare strings - /// Creates a copy of query, changes the database and table names. - inline ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table) - { - auto modified_query_ast = query->clone(); + return res; +} - auto & actual_query = typeid_cast(*modified_query_ast); - actual_query.database = database; - actual_query.table = table; - /// make sure query is not INSERT SELECT - actual_query.select = nullptr; +void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & increment) +{ + if (!path.empty()) + increment.set(getMaximumFileNumber(path)); +} - return modified_query_ast; - } } @@ -96,6 +139,7 @@ StorageDistributed::StorageDistributed( path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) { createDirectoryMonitors(); + initializeFileNamesIncrement(path, file_names_increment); } @@ -120,6 +164,7 @@ StorageDistributed::StorageDistributed( path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) { createDirectoryMonitors(); + initializeFileNamesIncrement(path, file_names_increment); } @@ -443,10 +488,11 @@ void StorageDistributed::createDirectoryMonitors() Poco::File{path}.createDirectory(); - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it{path}; it != end; ++it) - if (it->isDirectory()) - createDirectoryMonitor(it.name()); + boost::filesystem::directory_iterator begin(path); + boost::filesystem::directory_iterator end; + for (auto it = begin; it != end; ++it) + if (it->status().type() == boost::filesystem::directory_file) + createDirectoryMonitor(it->path().filename().string()); } diff --git a/dbms/src/Storages/StorageDistributed.h b/dbms/src/Storages/StorageDistributed.h index db6c3417ef4..3a008b416b8 100644 --- a/dbms/src/Storages/StorageDistributed.h +++ b/dbms/src/Storages/StorageDistributed.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -159,6 +160,9 @@ private: String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created. std::unordered_map> directory_monitors; + + /// Used for global monotonic ordering of files to send. + SimpleIncrement file_names_increment; }; }