From 09265f50f94b3d4b4d4f2e22673c1ed39dbca22a Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Fri, 22 Aug 2014 18:05:34 +0400 Subject: [PATCH] StorageDistributed: write to multiple local replicas, post-review corrections. [#METR-12221] --- dbms/include/DB/Interpreters/Cluster.h | 2 +- .../Storages/Distributed/DirectoryMonitor.h | 4 +- .../DistributedBlockOutputStream.h | 103 ++++++++++++++---- .../DB/Storages/Distributed/queryToString.h | 5 +- dbms/src/Interpreters/Cluster.cpp | 6 +- dbms/src/Storages/StorageDistributed.cpp | 41 +++---- 6 files changed, 104 insertions(+), 57 deletions(-) diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index 0f98fc39799..9766cadbc0e 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -34,7 +34,7 @@ public: /// contains names of directories for asynchronous write to StorageDistributed std::vector dir_names; int weight; - bool has_local_node; + size_t num_local_nodes; }; std::vector shard_info_vec; std::vector slot_to_shard; diff --git a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h index 190d021d2f4..3438dfd9bb2 100644 --- a/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/include/DB/Storages/Distributed/DirectoryMonitor.h @@ -20,7 +20,7 @@ namespace for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) { - const auto & address = boost::copy_range(*it); + const auto address = boost::copy_range(*it); const auto user_pw_end = strchr(address.data(), '@'); const auto colon = strchr(address.data(), ':'); @@ -65,8 +65,8 @@ public: ~DirectoryMonitor() { { - std::lock_guard lock{mutex}; quit = true; + std::lock_guard lock{mutex}; } cond.notify_one(); thread.join(); diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 6bfcb3fc362..c3a941eefec 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -41,56 +41,111 @@ public: } private: + template + static std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) + { + const auto total_weight = cluster.slot_to_shard.size(); + const auto num_shards = cluster.shard_info_vec.size(); + std::vector filters(num_shards); + + /// const columns contain only one value, therefore we do not need to read it at every iteration + if (column->isConst()) + { + const auto data = typeid_cast *>(column)->getData(); + const auto shard_num = cluster.slot_to_shard[data % total_weight]; + + for (size_t i = 0; i < num_shards; ++i) + filters[i].assign(num_rows, static_cast(shard_num == i)); + } + else + { + const auto & data = typeid_cast *>(column)->getData(); + + for (size_t i = 0; i < num_shards; ++i) + { + filters[i].resize(num_rows); + for (size_t j = 0; j < num_rows; ++j) + filters[i][j] = cluster.slot_to_shard[data[j] % total_weight] == i; + } + } + + return filters; + } + + std::vector createFilters(Block block) + { + using create_filters_sig = std::vector(size_t, const IColumn *, const Cluster &); + /// hashmap of pointers to functions corresponding to each integral type + static std::unordered_map creators{ + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + }; + + storage.getShardingKeyExpr()->execute(block); + + const auto & key_column = block.getByName(storage.getShardingKeyColumnName()); + + /// check that key column has valid type + const auto it = creators.find(key_column.type->getName()); + + return it != std::end(creators) + ? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster) + : throw Exception{ + "Sharding key expression does not evaluate to an integer type", + ErrorCodes::TYPE_MISMATCH + }; + } + void writeSplit(const Block & block) { - auto block_with_key = block; - storage.getShardingKeyExpr()->execute(block_with_key); - - const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column; - const auto total_weight = storage.cluster.slot_to_shard.size(); - - /// shard => block mapping - std::vector> target_blocks(storage.cluster.shard_info_vec.size()); - const auto num_cols = block.columns(); + /// cache column pointers for later reuse std::vector columns(num_cols); for (size_t i = 0; i < columns.size(); ++i) columns[i] = block.getByPosition(i).column; - for (size_t num_rows = block.rowsInFirstColumn(), row = 0; row < num_rows; ++row) + auto filters = createFilters(block); + + const auto num_shards = storage.cluster.shard_info_vec.size(); + for (size_t i = 0; i < num_shards; ++i) { - const auto target_block_idx = storage.cluster.slot_to_shard[key_column->get64(row) % total_weight]; - auto & target_block = target_blocks[target_block_idx]; - if (!target_block) - target_block = stdext::make_unique(block.cloneEmpty()); + auto target_block = block.cloneEmpty(); for (size_t col = 0; col < num_cols; ++col) - target_block->getByPosition(col).column->insertFrom(*columns[col], row); - } + target_block.getByPosition(col).column = columns[col]->filter(filters[i]); - for (size_t i = 0; i < target_blocks.size(); ++i) - if (const auto & target_block = target_blocks[i]) - writeImpl(*target_block, i); + if (target_block.rowsInFirstColumn()) + writeImpl(target_block, i); + } } void writeImpl(const Block & block, const size_t shard_id = 0) { const auto & shard_info = storage.cluster.shard_info_vec[shard_id]; - if (shard_info.has_local_node) - writeToLocal(block); + if (shard_info.num_local_nodes) + writeToLocal(block, shard_info.num_local_nodes); /// dir_names is empty if shard has only local addresses if (!shard_info.dir_names.empty()) writeToShard(block, shard_info.dir_names); } - void writeToLocal(const Block & block) + void writeToLocal(const Block & block, const size_t repeats) { InterpreterInsertQuery interp{query_ast, storage.context}; auto block_io = interp.execute(); block_io.out->writePrefix(); - block_io.out->write(block); + + for (size_t i = 0; i < repeats; ++i) + block_io.out->write(block); + block_io.out->writeSuffix(); } @@ -102,7 +157,7 @@ private: std::string first_file_tmp_path{}; auto first = true; - const auto & query_string = queryToString(query_ast); + const auto & query_string = queryToString(query_ast); /// write first file, hardlink the others for (const auto & dir_name : dir_names) diff --git a/dbms/include/DB/Storages/Distributed/queryToString.h b/dbms/include/DB/Storages/Distributed/queryToString.h index 019ac1a3874..9f1b243ed9f 100644 --- a/dbms/include/DB/Storages/Distributed/queryToString.h +++ b/dbms/include/DB/Storages/Distributed/queryToString.h @@ -4,13 +4,10 @@ namespace DB { - template inline std::string queryToString(const ASTPtr & query) { - const auto & query_ast = typeid_cast(*query); - std::ostringstream s; - formatAST(query_ast, s, 0, false, true); + formatAST(*query, s, 0, false, true); return s.str(); } diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index b4e8f210e62..7c7e04ae7f3 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -105,7 +105,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa * the first element of vector; otherwise we will just .emplace_back */ std::vector dir_names{}; - auto has_local_node = false; + size_t num_local_nodes = 0; auto first = true; for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt) @@ -120,7 +120,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa if (isLocal(replica_addresses.back())) { - has_local_node = true; + ++num_local_nodes; } else { @@ -143,7 +143,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa } slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({std::move(dir_names), weight, has_local_node}); + shard_info_vec.push_back({std::move(dir_names), weight, num_local_nodes}); } else throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 8a471c1847d..5d7074c7512 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -20,35 +20,30 @@ namespace DB namespace { - template void rewriteImpl(ASTType &, const std::string &, const std::string &) = delete; - /// select query has database and table names as AST pointers - template <> inline void rewriteImpl(ASTSelectQuery & query, - const std::string & database, const std::string & table) + /// Создает копию запроса, меняет имена базы данных и таблицы. + inline ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table) { - query.database = new ASTIdentifier{{}, database, ASTIdentifier::Database}; - query.table = new ASTIdentifier{{}, table, ASTIdentifier::Table}; + auto modified_query_ast = query->clone(); + + auto & actual_query = typeid_cast(*modified_query_ast); + actual_query.database = new ASTIdentifier{{}, database, ASTIdentifier::Database}; + actual_query.table = new ASTIdentifier{{}, table, ASTIdentifier::Table}; + + return modified_query_ast; } /// insert query has database and table names as bare strings - template <> inline void rewriteImpl(ASTInsertQuery & query, - const std::string & database, const std::string & table) - { - query.database = database; - query.table = table; - /// make sure query is not INSERT SELECT - query.select = nullptr; - } - /// Создает копию запроса, меняет имена базы данных и таблицы. - template - inline ASTPtr rewriteQuery(const ASTPtr & query, const std::string & database, const std::string & table) + inline ASTPtr rewriteInsertQuery(const ASTPtr & query, const std::string & database, const std::string & table) { - /// Создаем копию запроса. auto modified_query_ast = query->clone(); - /// Меняем имена таблицы и базы данных - rewriteImpl(typeid_cast(*modified_query_ast), database, table); + auto & actual_query = typeid_cast(*modified_query_ast); + actual_query.database = database; + actual_query.table = table; + /// make sure query is not INSERT SELECT + actual_query.select = nullptr; return modified_query_ast; } @@ -131,9 +126,9 @@ BlockInputStreams StorageDistributed::read( : QueryProcessingStage::WithMergeableState; BlockInputStreams res; - const auto & modified_query_ast = rewriteQuery( + const auto & modified_query_ast = rewriteSelectQuery( query, remote_database, remote_table); - const auto & modified_query = queryToString(modified_query_ast); + const auto & modified_query = queryToString(modified_query_ast); /// Цикл по шардам. for (auto & conn_pool : cluster.pools) @@ -172,7 +167,7 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query) return new DistributedBlockOutputStream{ *this, - rewriteQuery(query, remote_database, remote_table) + rewriteInsertQuery(query, remote_database, remote_table) }; }