Improved performance of inserting into StorageDistributed with very high number of shards (not tested) [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-05-10 02:39:37 -04:00
parent e2f8ec8f2d
commit acfe3d5028
3 changed files with 73 additions and 24 deletions

View File

@ -11,7 +11,6 @@
#include <Interpreters/Cluster.h>
#include <Interpreters/createBlockSelector.h>
#include <Common/Increment.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/ClickHouseRevision.h>
@ -142,7 +141,7 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin";
const auto & file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto & block_file_path = path + file_name;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure

View File

@ -38,6 +38,8 @@
#include <memory>
#include <boost/filesystem.hpp>
namespace DB
{
@ -53,29 +55,70 @@ namespace ErrorCodes
namespace
{
/// select query has database and table names as AST pointers
/// Creates a copy of query, changes database and table names.
inline ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
/// select query has database and table names as AST pointers
/// Creates a copy of query, changes database and table names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
auto modified_query_ast = query->clone();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
}
/// insert query has database and table names as bare strings
/// Creates a copy of query, changes the database and table names.
ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
auto modified_query_ast = query->clone();
auto & actual_query = typeid_cast<ASTInsertQuery &>(*modified_query_ast);
actual_query.database = database;
actual_query.table = table;
/// make sure query is not INSERT SELECT
actual_query.select = nullptr;
return modified_query_ast;
}
/// Calculate maximum number in file names in directory and all subdirectories.
/// To ensure global order of data blocks yet to be sent across server restarts.
UInt64 getMaximumFileNumber(const std::string & path)
{
UInt64 res = 0;
boost::filesystem::recursive_directory_iterator begin(path);
boost::filesystem::recursive_directory_iterator end;
for (auto it = begin; it != end; ++it)
{
auto modified_query_ast = query->clone();
typeid_cast<ASTSelectQuery &>(*modified_query_ast).replaceDatabaseAndTable(database, table);
return modified_query_ast;
const auto & path = it->path();
if (it->status().type() != boost::filesystem::regular_file || !endsWith(path.filename().string(), ".bin"))
continue;
UInt64 num = 0;
try
{
num = parse<UInt64>(path.filename().stem().string());
}
catch (Exception & e)
{
e.addMessage("Unexpected file name " + path.filename().string() + " found at " + path.parent_path().string() + ", should have numeric base name.");
throw;
}
if (num > res)
res = num;
}
/// insert query has database and table names as bare strings
/// Creates a copy of query, changes the database and table names.
inline ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table)
{
auto modified_query_ast = query->clone();
return res;
}
auto & actual_query = typeid_cast<ASTInsertQuery &>(*modified_query_ast);
actual_query.database = database;
actual_query.table = table;
/// make sure query is not INSERT SELECT
actual_query.select = nullptr;
void initializeFileNamesIncrement(const std::string & path, SimpleIncrement & increment)
{
if (!path.empty())
increment.set(getMaximumFileNumber(path));
}
return modified_query_ast;
}
}
@ -96,6 +139,7 @@ StorageDistributed::StorageDistributed(
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
}
@ -120,6 +164,7 @@ StorageDistributed::StorageDistributed(
path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/'))
{
createDirectoryMonitors();
initializeFileNamesIncrement(path, file_names_increment);
}
@ -443,10 +488,11 @@ void StorageDistributed::createDirectoryMonitors()
Poco::File{path}.createDirectory();
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{path}; it != end; ++it)
if (it->isDirectory())
createDirectoryMonitor(it.name());
boost::filesystem::directory_iterator begin(path);
boost::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (it->status().type() == boost::filesystem::directory_file)
createDirectoryMonitor(it->path().filename().string());
}

View File

@ -3,6 +3,7 @@
#include <ext/shared_ptr_helper.hpp>
#include <Storages/IStorage.h>
#include <Common/Increment.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Interpreters/Settings.h>
@ -159,6 +160,9 @@ private:
String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created.
std::unordered_map<std::string, std::unique_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
};
}