diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 6befeb9fe8d..a7ec92935d9 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -44,7 +44,7 @@ private: for (size_t row = 0; row < block.rows(); ++row) { - const auto target_block_idx = key_column->get64(row) % total_weight; + const auto target_block_idx = cluster.slot_to_shard[key_column->get64(row) % total_weight]; auto & target_block = get_target_block(target_block_idx)->second;; for (size_t col = 0; col < block.columns(); ++col) @@ -59,9 +59,15 @@ private: writeImpl(shard_block_pair.second, shard_block_pair.first); } - void writeImpl(const Block & block, const size_t shard_num = 0) + void writeImpl(const Block & block, const size_t shard_id = 0) { - std::cout << "dummy write block of " << block.bytes() << " bytes to shard " << shard_num << std::endl; + const auto & dir_name = cluster.shard_info_vec[shard_id].dir_name; + + /// ensure shard subdirectory creation and notify storage if necessary + if (Poco::File(storage.getPath() + dir_name).createDirectory()) + storage.createDirectoryMonitor(dir_name); + + std::cout << "dummy write block of " << block.bytes() << " bytes to shard " << shard_id << std::endl; } StorageDistributed & storage; diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 7e8fe0cec43..a722dc86111 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -67,10 +67,15 @@ public: /// структура подтаблиц не проверяется void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context); + virtual void shutdown() override; + const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; } const String & getShardingKeyColumnName() const { return sharding_key_column_name; } const String & getPath() const { return path; } + /// create directory monitor thread by subdirectory name + void createDirectoryMonitor(const std::string & name); + private: StorageDistributed( const std::string & name_, @@ -85,6 +90,10 @@ private: /// Создает копию запроса, меняет имена базы данных и таблицы. ASTPtr rewriteQuery(ASTPtr query); + void createDirectoryMonitors(); + + void directoryMonitorFunc(const std::string & path); + String name; NamesAndTypesListPtr columns; String remote_database; @@ -106,6 +115,9 @@ private: String sharding_key_column_name; bool write_enabled; String path; + + std::atomic quit{false}; + std::unordered_map directory_monitor_threads; }; } diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index aea66ae3afe..4b1357e3a1e 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -44,6 +44,8 @@ StorageDistributed::StorageDistributed( << " with weight " << shard_info.weight << std::endl; } + + createDirectoryMonitors(); } StoragePtr StorageDistributed::create( @@ -177,6 +179,14 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context); } +void StorageDistributed::shutdown() +{ + quit.store(true, std::memory_order_relaxed); + + for (auto & name_thread_pair : directory_monitor_threads) + name_thread_pair.second.join(); +} + NameAndTypePair StorageDistributed::getColumn(const String & column_name) const { auto type = VirtualColumnFactory::tryGetType(column_name); @@ -191,4 +201,38 @@ bool StorageDistributed::hasColumn(const String & column_name) const return VirtualColumnFactory::hasColumn(column_name) || hasRealColumn(column_name); } +void StorageDistributed::createDirectoryMonitors() +{ + Poco::File(path).createDirectory(); + + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator it(path); it != end; ++it) + if (it->isDirectory()) + createDirectoryMonitor(it.name()); +} + +void StorageDistributed::createDirectoryMonitor(const std::string & name) +{ + if (directory_monitor_threads.count(name)) + return; + + directory_monitor_threads.emplace( + name, + std::thread{ + &StorageDistributed::directoryMonitorFunc, this, path + name + } + ); +} + +void StorageDistributed::directoryMonitorFunc(const std::string & path) +{ + std::cout << "created monitor for directory " << path << std::endl; + + while (!quit.load(std::memory_order_relaxed)) + { + } + + std::cout << "exiting monitor for directory " << path << std::endl; +} + }