diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index a253d6ea740..7599036d248 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -64,3 +64,5 @@ #define DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES 35265 #define DBMS_MIN_REVISION_WITH_STRING_QUERY_ID 39002 #define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264 + +#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100 diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index 03eee47de71..ca9650710bc 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -31,8 +31,9 @@ public: struct ShardInfo { - std::string dir_name; + std::vector dir_names; int weight; + bool has_local_node; }; std::vector shard_info_vec; std::vector slot_to_shard; @@ -66,9 +67,9 @@ public: Address(const String & host_port_, const String & user_, const String & password_); }; - static bool addressIsLocal(const Poco::Net::SocketAddress & address); +private: + static bool isLocal(const Address & address); -// private: /// Массив шардов. Каждый шард - адреса одного сервера. typedef std::vector
Addresses; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 2057cf720c9..9eb46a7b102 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -70,6 +70,9 @@ struct Settings * TODO: Сейчас применяется только при запуске сервера. Можно сделать изменяемым динамически. */ \ M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \ \ + /** Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown */ \ + M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS) \ + \ M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \ \ M(SettingTotalsMode, totals_mode, TotalsMode::BEFORE_HAVING) \ diff --git a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h new file mode 100644 index 00000000000..ca4e212bb59 --- /dev/null +++ b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +namespace +{ + template ConnectionPools createPoolsForAddresses(const std::string & name, F && f) + { + ConnectionPools pools; + + for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) + { + const auto & address = boost::copy_range(*it); + + const auto user_pw_end = strchr(address.data(), '@'); + const auto colon = strchr(address.data(), ':'); + if (!user_pw_end || !colon) + throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"}; + + const auto has_pw = colon < user_pw_end; + const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; + if (!host_end) + throw Exception{"Shard address '" + address + "' does not contain port"}; + + const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end}); + const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{}; + const auto host = unescapeForFileName({user_pw_end + 1, host_end}); + const auto port = DB::parse(host_end + 1); + + pools.emplace_back(f(host, port, user, password)); + } + + /// just to be explicit + return std::move(pools); + } +} + +class DirectoryMonitor +{ +public: + DirectoryMonitor(StorageDistributed & storage, const std::string & name) + : storage(storage), pool{createPool(name)}, path{storage.path + name + '/'} + , sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()} + , log{&Logger::get(getLoggerName())} + { + } + + void run() + { + while (!storage.quit.load(std::memory_order_relaxed)) + { + auto no_work = true; + auto exception = false; + + try + { + no_work = !findFiles(); + } + catch (...) + { + exception = true; + tryLogCurrentException(getLoggerName().data()); + } + + if (no_work || exception) + std::this_thread::sleep_for(sleep_time); + } + } + +private: + ConnectionPoolPtr createPool(const std::string & name) + { + const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port, const std::string & user, const std::string & password) { + return new ConnectionPool{ + 1, host, port, "", + user, password, storage.context.getDataTypeFactory(), + storage.getName() + '_' + name + }; + }; + + auto pools = createPoolsForAddresses(name, pool_factory); + + return pools.size() == 1 ? pools.front() : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM); + } + + bool findFiles() + { + std::map files; + + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator it{path}; it != end; ++it) + { + const auto & file_path_str = it->path(); + Poco::Path file_path{file_path_str}; + + if (!it->isDirectory() && 0 == strncmp(file_path.getExtension().data(), "bin", strlen("bin"))) + files[DB::parse(file_path.getBaseName())] = file_path_str; + } + + if (files.empty()) + return false; + + for (const auto & file : files) + { + if (storage.quit.load(std::memory_order_relaxed)) + return true; + + processFile(file.second); + } + + return true; + } + + void processFile(const std::string & file_path) + { + LOG_TRACE(log, "Started processing `" << file_path << '`'); + auto connection = pool->get(); + + try + { + DB::ReadBufferFromFile in{file_path}; + + std::string insert_query; + DB::readStringBinary(insert_query, in); + + DB::RemoteBlockOutputStream remote{*connection, insert_query}; + + remote.writePrefix(); + remote.writePrepared(in); + remote.writeSuffix(); + } + catch (const Exception & e) + { + const auto code = e.code(); + + /// mark file as broken if necessary + if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH || + code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED || + code == ErrorCodes::CANNOT_READ_ALL_DATA) + { + const auto last_path_separator_pos = file_path.rfind('/'); + const auto & path = file_path.substr(0, last_path_separator_pos + 1); + const auto & file_name = file_path.substr(last_path_separator_pos + 1); + const auto & broken_path = path + "broken/"; + + Poco::File{broken_path}.createDirectory(); + Poco::File{file_path}.moveTo(broken_path + file_name); + + LOG_ERROR(log, "Moved `" << file_path << "` to broken/ directory"); + } + + throw; + } + + Poco::File{file_path}.remove(); + + LOG_TRACE(log, "Finished processing `" << file_path << '`'); + } + + std::string getLoggerName() const { + return storage.name + '.' + storage.getName() + ".DirectoryMonitor"; + } + + StorageDistributed & storage; + ConnectionPoolPtr pool; + std::string path; + std::chrono::milliseconds sleep_time; + + Logger * log; +}; + +} diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 93c86d99883..8b08c5e5b0e 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -1,10 +1,12 @@ #pragma once #include +#include #include #include #include +#include #include @@ -16,26 +18,26 @@ namespace DB class DistributedBlockOutputStream : public IBlockOutputStream { public: - DistributedBlockOutputStream(StorageDistributed & storage, Cluster & cluster, const std::string & query_string) - : storage(storage), cluster(cluster), query_string(query_string) + DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast) + : storage(storage), query_ast(query_ast) { } virtual void write(const Block & block) override { - if (storage.getShardingKeyExpr() && cluster.shard_info_vec.size() > 1) - splitWrite(block, block); + if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1) + writeSplit(block, block); else writeImpl(block); } private: - void splitWrite(const Block & block, Block block_with_key) + void writeSplit(const Block & block, Block block_with_key) { storage.getShardingKeyExpr()->execute(block_with_key); const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column; - const auto total_weight = cluster.slot_to_shard.size(); + const auto total_weight = storage.cluster.slot_to_shard.size(); /// shard => block mapping std::unordered_map target_blocks; @@ -51,7 +53,7 @@ private: for (size_t row = 0; row < block.rows(); ++row) { - const auto target_block_idx = cluster.slot_to_shard[key_column->get64(row) % total_weight]; + const auto target_block_idx = storage.cluster.slot_to_shard[key_column->get64(row) % total_weight]; auto & target_block = get_target_block(target_block_idx)->second;; for (size_t col = 0; col < block.columns(); ++col) @@ -68,30 +70,76 @@ private: void writeImpl(const Block & block, const size_t shard_id = 0) { - const auto & dir_name = cluster.shard_info_vec[shard_id].dir_name; - const auto & path = storage.getPath() + dir_name + '/'; + const auto & shard_info = storage.cluster.shard_info_vec[shard_id]; + if (shard_info.has_local_node) + writeToLocal(block); - /// ensure shard subdirectory creation and notify storage if necessary - if (Poco::File(path).createDirectory()) - storage.createDirectoryMonitor(dir_name); + if (!shard_info.dir_names.empty()) + writeToShard(block, shard_info.dir_names); + } - const auto number = Increment(path + "increment").get(true); - const auto block_file_path = path + std::to_string(number); + void writeToLocal(const Block & block) + { + InterpreterInsertQuery interp{query_ast, storage.context}; - DB::WriteBufferFromFile out{block_file_path}; - DB::CompressedWriteBuffer compress{out}; - DB::NativeBlockOutputStream stream{compress}; + auto block_io = interp.execute(); + block_io.out->writePrefix(); + block_io.out->write(block); + block_io.out->writeSuffix(); + } - DB::writeStringBinary(query_string, out); + void writeToShard(const Block & block, const std::vector & dir_names) + { + /** tmp directory is used to ensure atomicity of transactions + * and keep monitor thread out from reading incomplete data + */ + std::string first_file_tmp_path{}; + std::string first_file_path{}; + auto first = true; + const auto & query_string = queryToString(query_ast); - stream.writePrefix(); - stream.write(block); - stream.writeSuffix(); + /// write first file, hardlink the others + for (const auto & dir_name : dir_names) + { + const auto & path = storage.getPath() + dir_name + '/'; + + /// ensure shard subdirectory creation and notify storage if necessary + if (Poco::File(path).createDirectory()) + storage.createDirectoryMonitor(dir_name); + + const auto & file_name = std::to_string(Increment(path + "increment.txt").get(true)) + ".bin"; + const auto & block_file_path = path + file_name; + + if (first) + { + first = false; + + const auto & tmp_path = path + "tmp/"; + Poco::File(tmp_path).createDirectory(); + const auto & block_file_tmp_path = tmp_path + file_name; + + first_file_path = block_file_path; + first_file_tmp_path = block_file_tmp_path; + + DB::WriteBufferFromFile out{block_file_tmp_path}; + DB::CompressedWriteBuffer compress{out}; + DB::NativeBlockOutputStream stream{compress}; + + DB::writeStringBinary(query_string, out); + + stream.writePrefix(); + stream.write(block); + stream.writeSuffix(); + } + else if (link(first_file_tmp_path.data(), block_file_path.data())) + throw Exception{"Could not link " + block_file_path + " to " + first_file_tmp_path}; + } + + Poco::File(first_file_tmp_path).renameTo(first_file_path); } StorageDistributed & storage; - Cluster & cluster; - std::string query_string; + ASTPtr query_ast; }; } diff --git a/dbms/include/DB/Storages/Distributed/queryToString.h b/dbms/include/DB/Storages/Distributed/queryToString.h new file mode 100644 index 00000000000..019ac1a3874 --- /dev/null +++ b/dbms/include/DB/Storages/Distributed/queryToString.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + template + inline std::string queryToString(const ASTPtr & query) + { + const auto & query_ast = typeid_cast(*query); + + std::ostringstream s; + formatAST(query_ast, s, 0, false, true); + + return s.str(); + } +} diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 80a244990b1..0b21ffbd4a8 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -18,6 +18,9 @@ namespace DB */ class StorageDistributed : public IStorage { + friend class DistributedBlockOutputStream; + friend class DirectoryMonitor; + public: static StoragePtr create( const std::string & name_, /// Имя таблицы. @@ -73,8 +76,6 @@ public: const String & getShardingKeyColumnName() const { return sharding_key_column_name; } const String & getPath() const { return path; } - /// create directory monitor thread by subdirectory name - void createDirectoryMonitor(const std::string & name); private: StorageDistributed( @@ -83,20 +84,22 @@ private: const String & remote_database_, const String & remote_table_, Cluster & cluster_, - const Context & context_, + Context & context_, const ASTPtr & sharding_key_ = nullptr, const String & data_path_ = String{}); - void createDirectoryMonitors(); - void directoryMonitorFunc(const std::string & path); + /// create directory monitor thread by subdirectory name + void createDirectoryMonitor(const std::string & name); + /// create directory monitors for each existing subdirectory + void createDirectoryMonitors(); String name; NamesAndTypesListPtr columns; String remote_database; String remote_table; - const Context & context; + Context & context; /// Временные таблицы, которые необходимо отправить на сервер. Переменная очищается после каждого вызова метода read /// Для подготовки к отправке нужно использовтаь метод storeExternalTables diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 530001a24aa..c9593ce4c72 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -79,8 +79,12 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa continue; addresses.emplace_back(prefix); + slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({addressToDirName(addresses.back()), weight}); + if (const auto is_local = isLocal(addresses.back())) + shard_info_vec.push_back({{}, weight, is_local }); + else + shard_info_vec.push_back({{addressToDirName(addresses.back())}, weight, is_local}); } else if (0 == strncmp(it->c_str(), "shard", strlen("shard"))) { @@ -95,9 +99,14 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa if (weight == 0) continue; - // const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false); + const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false); - std::string dir_name{}; + /** in case of internal_replication we will be appending names to + * the first element of vector, therefore we need first element + * created in advance; otherwise we will just .emplace_back + */ + std::vector dir_names(internal_replication); + auto has_local_node = false; auto first = true; for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt) @@ -110,16 +119,26 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa { replica_addresses.emplace_back(partial_prefix + *jt); - dir_name += (first ? "" : ",") + addressToDirName(replica_addresses.back()); + if (isLocal(replica_addresses.back())) + { + has_local_node = true; + } + else + { + if (internal_replication) + dir_names.front() += (first ? "" : ",") + addressToDirName(replica_addresses.back()); + else + dir_names.emplace_back(addressToDirName(replica_addresses.back())); - if (first) first = false; + if (first) first = false; + } } else throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); } slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({dir_name, weight}); + shard_info_vec.push_back({std::move(dir_names), weight, has_local_node}); } else throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); @@ -138,7 +157,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa bool has_local_replics = false; for (Addresses::const_iterator jt = it->begin(); jt != it->end(); ++jt) { - if (addressIsLocal(jt->host_port)) + if (isLocal(*jt)) { has_local_replics = true; break; @@ -164,7 +183,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa { for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it) { - if (addressIsLocal(it->host_port)) + if (isLocal(*it)) { ++local_nodes_num; } @@ -223,7 +242,7 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan } -bool Cluster::addressIsLocal(const Poco::Net::SocketAddress & address) +bool Cluster::isLocal(const Address & address) { /// Если среди реплик существует такая, что: /// - её порт совпадает с портом, который слушает сервер; @@ -232,11 +251,11 @@ bool Cluster::addressIsLocal(const Poco::Net::SocketAddress & address) const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); static auto interfaces = Poco::Net::NetworkInterface::list(); - if (clickhouse_port == address.port() && + if (clickhouse_port == address.host_port.port() && interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), - [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host(); })) + [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); })) { - LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.toString() << " will be processed as local."); + LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.host_port.toString() << " will be processed as local."); return true; } return false; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 6aa2fe89018..6226885258f 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -1,15 +1,11 @@ -#include - #include #include #include #include #include - -#include -#include - +#include +#include #include #include @@ -17,24 +13,10 @@ #include -#include -#include - namespace DB { namespace { - template - inline std::string queryToString(const ASTPtr & query) - { - const auto & query_ast = typeid_cast(*query); - - std::ostringstream s; - formatAST(query_ast, s, 0, false, true); - - return s.str(); - } - /// select and insert query have different types for database and table, hence two specializations template struct rewrite_traits; template <> struct rewrite_traits { using type = ASTPtr; }; @@ -81,7 +63,7 @@ StorageDistributed::StorageDistributed( const String & remote_database_, const String & remote_table_, Cluster & cluster_, - const Context & context_, + Context & context_, const ASTPtr & sharding_key_, const String & data_path_) : name(name_), columns(columns_), @@ -92,8 +74,6 @@ StorageDistributed::StorageDistributed( write_enabled(cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_), path(data_path_ + escapeForFileName(name) + '/') { - std::cout << "table `" << name << "` in " << path << std::endl; - createDirectoryMonitors(); } @@ -194,12 +174,9 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query) ErrorCodes::NOT_IMPLEMENTED }; - return new DistributedBlockOutputStream{ - *this, this->cluster, - queryToString(rewriteQuery( - query, remote_database, remote_table - )) - }; + const auto & modified_query = rewriteQuery(query, remote_database, remote_table); + + return new DistributedBlockOutputStream{*this, modified_query}; } void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) @@ -219,9 +196,8 @@ void StorageDistributed::shutdown() NameAndTypePair StorageDistributed::getColumn(const String & column_name) const { - auto type = VirtualColumnFactory::tryGetType(column_name); - if (type) - return NameAndTypePair(column_name, type); + if (const auto & type = VirtualColumnFactory::tryGetType(column_name)) + return { column_name, type }; return getRealColumn(column_name); } @@ -231,89 +207,28 @@ bool StorageDistributed::hasColumn(const String & column_name) const return VirtualColumnFactory::hasColumn(column_name) || hasRealColumn(column_name); } -void StorageDistributed::createDirectoryMonitors() -{ - Poco::File(path).createDirectory(); - - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it(path); it != end; ++it) - if (it->isDirectory()) - createDirectoryMonitor(it.name()); -} - void StorageDistributed::createDirectoryMonitor(const std::string & name) { if (directory_monitor_threads.count(name)) return; directory_monitor_threads.emplace( - name, - std::thread{ - &StorageDistributed::directoryMonitorFunc, this, name + name, + std::thread{[this, name] { + DirectoryMonitor monitor{*this, name}; + monitor.run(); } - ); + }); } -void StorageDistributed::directoryMonitorFunc(const std::string & name) +void StorageDistributed::createDirectoryMonitors() { - const auto & path = this->path + name + '/'; - std::cout << "created monitor for directory " << path << std::endl; + Poco::File{path}.createDirectory(); - auto is_local = false; - ConnectionPools pools; - - for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) - { - const auto & address = boost::copy_range(*it); - - const auto user_pw_end = strchr(address.data(), '@'); - const auto colon = strchr(address.data(), ':'); - if (!user_pw_end || !colon) - throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"}; - - const auto has_pw = colon < user_pw_end; - const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; - if (!host_end) - throw Exception{"Shard address '" + address + "' does not contain port"}; - - const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end}); - const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{}; - const auto host = unescapeForFileName({user_pw_end + 1, host_end}); - const auto port = DB::parse(host_end + 1); - - std::cout - << "\taddress " << host - << " port " << port - << " user " << user - << " password " << password - << std::endl; - - if (Cluster::addressIsLocal({host, port})) - { - is_local = true; - break; - } - - pools.emplace_back(new ConnectionPool{ - 1, host, port, "", - user, password, context.getDataTypeFactory(), - getName() + '_' + name - }); - } - - std::cout << "local? " << std::boolalpha << is_local << std::endl; - const auto pool = is_local - ? (pools.size() == 1 - ? pools[0] - : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM) - ) - : nullptr; - - while (!quit.load(std::memory_order_relaxed)) - { - } - - std::cout << "exiting monitor for directory " << path << std::endl; + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator it{path}; it != end; ++it) + if (it->isDirectory()) + createDirectoryMonitor(it.name()); } }