From 9fb3135c23e27032d99b64eda2604fb0b342a7c2 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Fri, 19 Jul 2019 03:39:22 +0300 Subject: [PATCH 1/3] Improve utility zookeeper-adjust-block-numbers-to-parts. Add filter on shards and tables. Add flag --dry-run. The utility now checks cversion before doing anything. --- utils/CMakeLists.txt | 1 + .../main.cpp | 203 ++++++++++++++---- 2 files changed, 158 insertions(+), 46 deletions(-) diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index 3b523822451..b3df25d13e6 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -24,6 +24,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS) add_subdirectory (zookeeper-copy-tree) add_subdirectory (zookeeper-remove-by-list) add_subdirectory (zookeeper-create-entry-to-download-part) + add_subdirectory (zookeeper-adjust-block-numbers-to-parts) add_subdirectory (wikistat-loader) add_subdirectory (fill-factor) add_subdirectory (check-marks) diff --git a/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp b/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp index dda1677f3a4..099a9e64153 100644 --- a/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp +++ b/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp @@ -1,13 +1,55 @@ #include #include #include +#include #include #include #include #include -size_t getMaxBlockSizeForPartition(zkutil::ZooKeeper & zk, + +std::vector getAllShards(zkutil::ZooKeeper & zk, const std::string & root) +{ + return zk.getChildren(root); +} + + +std::vector removeNotExistingShards(zkutil::ZooKeeper & zk, const std::string & root, const std::vector & shards) +{ + auto existing_shards = getAllShards(zk, root); + std::vector filtered_shards; + filtered_shards.reserve(shards.size()); + for (const auto & shard : shards) + if (std::find(existing_shards.begin(), existing_shards.end(), shard) == existing_shards.end()) + std::cerr << "Shard " << shard << " not found." << std::endl; + else + filtered_shards.emplace_back(shard); + return filtered_shards; +} + + +std::vector getAllTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard) +{ + return zk.getChildren(root + "/" + shard); +} + + +std::vector removeNotExistingTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard, const std::vector & tables) +{ + auto existing_tables = getAllTables(zk, root, shard); + std::vector filtered_tables; + filtered_tables.reserve(tables.size()); + for (const auto & table : tables) + if (std::find(existing_tables.begin(), existing_tables.end(), table) == existing_tables.end()) + std::cerr << "\tTable " << table << " not found on shard " << shard << "." << std::endl; + else + filtered_tables.emplace_back(table); + return filtered_tables; +} + + +size_t getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & replica_path, const std::string & partition_name, const DB::MergeTreeDataFormatVersion & format_version) @@ -28,36 +70,74 @@ size_t getMaxBlockSizeForPartition(zkutil::ZooKeeper & zk, } catch (const DB::Exception & ex) { - std::cerr << "Exception on: " << ex.displayText() << " will skip part: " << part << std::endl; + std::cerr << ex.displayText() << ", Part " << part << "skipped." << std::endl; } } } return max_block_num; } -std::unordered_map getAllTablesBlockPaths(zkutil::ZooKeeper & zk, const std::string & root) + +size_t getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path) +{ + Coordination::Stat stat; + zk.get(part_path, &stat); + + /// References: + /// https://stackoverflow.com/a/10347910 + /// https://bowenli86.github.io/2016/07/07/distributed%20system/zookeeper/How-does-ZooKeeper-s-persistent-sequential-id-work/ + return (stat.cversion + stat.numChildren) / 2; +} + + +std::unordered_map getPartitionsNeedAdjustingBlockNumbers( + zkutil::ZooKeeper & zk, const std::string & root, const std::vector & shards, const std::vector & tables) { std::unordered_map result; - auto shards = zk.getChildren(root); - for (const auto & shard : shards) + + std::vector use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards); + + for (const auto & shard : use_shards) { - std::string shard_path = root + "/" + shard; - auto tables = zk.getChildren(shard_path); - for (auto table : tables) + std::cout << "Shard: " << shard << std::endl; + std::vector use_tables = tables.empty() ? getAllTables(zk, root, shard) : removeNotExistingTables(zk, root, shard, tables); + + for (auto table : use_tables) { - std::cerr << "Searching for nodes in: " << table << std::endl; - std::string table_path = shard_path + "/" + table; - auto format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version; + std::cout << "\tTable: " << table << std::endl; + std::string table_path = root + "/" + shard + "/" + table; std::string blocks_path = table_path + "/block_numbers"; - auto partitions = zk.getChildren(blocks_path); - if (!partitions.empty()) + + std::vector partitions; + DB::MergeTreeDataFormatVersion format_version; + try { - for (auto partition : partitions) + format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version; + partitions = zk.getChildren(blocks_path); + } + catch (const DB::Exception & ex) + { + std::cerr << ex.displayText() << ", table " << table << " skipped." << std::endl; + continue; + } + + for (auto partition : partitions) + { + try { std::string part_path = blocks_path + "/" + partition; - size_t partition_max_block = getMaxBlockSizeForPartition(zk, table_path, partition, format_version); - std::cerr << "\tFound max block number: " << partition_max_block << " for part: " << partition << std::endl; - result.emplace(part_path, partition_max_block); + size_t partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version); + size_t current_block_number = getCurrentBlockNumberForPartition(zk, part_path); + if (current_block_number <= partition_max_block) + { + std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number + << ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl; + result.emplace(part_path, partition_max_block); + } + } + catch (const DB::Exception & ex) + { + std::cerr << ex.displayText() << ", partition " << partition << " skipped." << std::endl; } } } @@ -66,67 +146,98 @@ std::unordered_map getAllTablesBlockPaths(zkutil::ZooKeeper } -void rotateNodes(zkutil::ZooKeeper & zk, const std::string & path, size_t max_block_num) +void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, size_t new_current_block_number) { - Coordination::Requests requests; std::string block_prefix = path + "/block-"; - std::string current = zk.create(block_prefix, "", zkutil::CreateMode::EphemeralSequential); - size_t current_block_num = DB::parse(current.c_str() + block_prefix.size(), current.size() - block_prefix.size()); - if (current_block_num >= max_block_num) - { - std::cerr << "Nothing to rotate, current block num: " << current_block_num << " max_block_num:" << max_block_num << std::endl; - return; - } + Coordination::Requests requests; - size_t need_to_rotate = max_block_num - current_block_num; - std::cerr << "Will rotate: " << need_to_rotate << " block numbers from " << current_block_num << " to " << max_block_num << std::endl; - - for (size_t i = 0; i < need_to_rotate; ++i) + for (size_t current_block_number = getCurrentBlockNumberForPartition(zk, path); + current_block_number < new_current_block_number; + ++current_block_number) { if (requests.size() == 50) { - std::cerr << "Rotating: " << i << " block numbers" << std::endl; zk.multi(requests); + std::cout << path << ": " << requests.size() << " ephemeral sequential nodes inserted." << std::endl; requests.clear(); } requests.emplace_back(zkutil::makeCreateRequest(path + "/block-", "", zkutil::CreateMode::EphemeralSequential)); } + if (!requests.empty()) { + std::cout << path << ": " << requests.size() << " ephemeral sequential nodes inserted." << std::endl; zk.multi(requests); } } + int main(int argc, char ** argv) try { - boost::program_options::options_description desc("Allowed options"); + /// Parse the command line. + namespace po = boost::program_options; + po::options_description desc("Allowed options"); desc.add_options() - ("help,h", "produce help message") - ("address,a", boost::program_options::value()->required(), "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") - ("path,p", boost::program_options::value()->required(), "path of replica queue to insert node (without trailing slash)"); + ("help,h", "show help") + ("zookeeper,z", po::value(), "Addresses of ZooKeeper instances, comma-separated. Example: example01e.yandex.ru:2181") + ("path,p", po::value(), "[optional] Path of replica queue to insert node (without trailing slash). By default it's /clickhouse/tables") + ("shard,s", po::value(), "[optional] Shards to process, comma-separated. If not specified then the utility will process all the shards.") + ("table,t", po::value(), "[optional] Tables to process, comma-separated. If not specified then the utility will process all the tables.") + ("dry-run", "[optional] Specify if you want this utility just to analyze block numbers without any changes."); - boost::program_options::variables_map options; - boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); + po::variables_map options; + po::store(po::parse_command_line(argc, argv, desc), options); - if (options.count("help")) + auto show_usage = [&] { - std::cout << "Util for /block_numbers node adjust with max block number in partition" << std::endl; - std::cout << "Usage: " << argv[0] << " [options]" << std::endl; + std::cout << "Usage: " << std::endl; + std::cout << " " << argv[0] << " [options]" << std::endl; std::cout << desc << std::endl; + }; + + if (options.count("help") || (argc == 1)) + { + std::cout << "This utility adjusts the /block_numbers zookeeper nodes to the correct block number in partition." << std::endl; + std::cout << "It might be useful when incorrect block numbers stored in zookeeper don't allow you to insert data into a table or drop/detach a partition." << std::endl; + show_usage(); + return 0; + } + + if (!options.count("zookeeper")) + { + std::cerr << "Option --zookeeper should be set." << std::endl; + show_usage(); return 1; } - std::string global_path = options.at("path").as(); + std::string root = options.count("path") ? options.at("path").as() : "/clickhouse/tables"; - zkutil::ZooKeeper zookeeper(options.at("address").as()); + std::vector shards, tables; + if (options.count("shard")) + boost::split(shards, options.at("shard").as(), boost::algorithm::is_any_of(",")); + if (options.count("table")) + boost::split(tables, options.at("table").as(), boost::algorithm::is_any_of(",")); - auto all_path = getAllTablesBlockPaths(zookeeper, global_path); - for (const auto & [path, max_block_num] : all_path) + /// Check if the adjusting of the block numbers is required. + std::cout << "Checking if adjusting of the block numbers is required:" << std::endl; + zkutil::ZooKeeper zookeeper(options.at("zookeeper").as()); + auto part_paths_with_max_block_numbers = getPartitionsNeedAdjustingBlockNumbers(zookeeper, root, shards, tables); + + if (part_paths_with_max_block_numbers.empty()) { - std::cerr << "Rotating on: " << path << std::endl; - rotateNodes(zookeeper, path, max_block_num); + std::cout << "No adjusting required." << std::endl; + return 0; } + + /// Adjust the block numbers. + if (options.count("dry-run")) + return 0; + + std::cout << std::endl << "Adjusting the block numbers:" << std::endl; + for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers) + setCurrentBlockNumber(zookeeper, part_path, max_block_number + 1); + return 0; } catch (const Poco::Exception & e) From e6aebb5b135674256d6df51dfc9c062a6ca6e96e Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Fri, 19 Jul 2019 12:19:47 +0300 Subject: [PATCH 2/3] Fix after review. --- .../main.cpp | 91 +++++++++++++------ 1 file changed, 65 insertions(+), 26 deletions(-) diff --git a/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp b/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp index 099a9e64153..3e449043adc 100644 --- a/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp +++ b/utils/zookeeper-adjust-block-numbers-to-parts/main.cpp @@ -49,14 +49,14 @@ std::vector removeNotExistingTables(zkutil::ZooKeeper & zk, const s } -size_t getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, +Int64 getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & replica_path, const std::string & partition_name, const DB::MergeTreeDataFormatVersion & format_version) { auto replicas_path = replica_path + "/replicas"; auto replica_hosts = zk.getChildren(replicas_path); - size_t max_block_num = 0; + Int64 max_block_num = 0; for (const auto & replica_host : replica_hosts) { auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts"); @@ -66,7 +66,7 @@ size_t getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, { auto info = DB::MergeTreePartInfo::fromPartName(part, format_version); if (info.partition_id == partition_name) - max_block_num = std::max(info.max_block, max_block_num); + max_block_num = std::max(info.max_block, max_block_num); } catch (const DB::Exception & ex) { @@ -78,7 +78,7 @@ size_t getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk, } -size_t getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path) +Int64 getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path) { Coordination::Stat stat; zk.get(part_path, &stat); @@ -90,10 +90,10 @@ size_t getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::stri } -std::unordered_map getPartitionsNeedAdjustingBlockNumbers( +std::unordered_map getPartitionsNeedAdjustingBlockNumbers( zkutil::ZooKeeper & zk, const std::string & root, const std::vector & shards, const std::vector & tables) { - std::unordered_map result; + std::unordered_map result; std::vector use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards); @@ -126,9 +126,9 @@ std::unordered_map getPartitionsNeedAdjustingBlockNumbers( try { std::string part_path = blocks_path + "/" + partition; - size_t partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version); - size_t current_block_number = getCurrentBlockNumberForPartition(zk, part_path); - if (current_block_number <= partition_max_block) + Int64 partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version); + Int64 current_block_number = getCurrentBlockNumberForPartition(zk, part_path); + if (current_block_number < partition_max_block + 1) { std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number << ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl; @@ -146,29 +146,63 @@ std::unordered_map getPartitionsNeedAdjustingBlockNumbers( } -void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, size_t new_current_block_number) +void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int64 new_current_block_number) { - std::string block_prefix = path + "/block-"; - Coordination::Requests requests; + Int64 current_block_number = getCurrentBlockNumberForPartition(zk, path); - for (size_t current_block_number = getCurrentBlockNumberForPartition(zk, path); - current_block_number < new_current_block_number; - ++current_block_number) + auto create_ephemeral_nodes = [&](size_t count) { - if (requests.size() == 50) + std::string block_prefix = path + "/block-"; + Coordination::Requests requests; + requests.reserve(count); + for (size_t i = 0; i != count; ++i) + requests.emplace_back(zkutil::makeCreateRequest(block_prefix, "", zkutil::CreateMode::EphemeralSequential)); + auto responses = zk.multi(requests); + + std::vector paths_created; + paths_created.reserve(responses.size()); + for (const auto & response : responses) { - zk.multi(requests); - std::cout << path << ": " << requests.size() << " ephemeral sequential nodes inserted." << std::endl; - requests.clear(); + const auto * create_response = dynamic_cast(response.get()); + if (!create_response) + { + std::cerr << "\tCould not create ephemeral node " << block_prefix << std::endl; + return false; + } + paths_created.emplace_back(create_response->path_created); } - requests.emplace_back(zkutil::makeCreateRequest(path + "/block-", "", zkutil::CreateMode::EphemeralSequential)); - } - if (!requests.empty()) - { - std::cout << path << ": " << requests.size() << " ephemeral sequential nodes inserted." << std::endl; - zk.multi(requests); - } + std::sort(paths_created.begin(), paths_created.end()); + for (const auto & path_created : paths_created) + { + Int64 number = DB::parse(path_created.c_str() + block_prefix.size(), path_created.size() - block_prefix.size()); + if (number != current_block_number) + { + char suffix[11] = ""; + sprintf(suffix, "%010ld", current_block_number); + std::string expected_path = block_prefix + suffix; + std::cerr << "\t" << path_created << ": Ephemeral node has been created with an unexpected path (expected something like " + << expected_path << ")." << std::endl; + return false; + } + std::cout << "\t" << path_created << std::endl; + ++current_block_number; + } + + return true; + }; + + if (current_block_number >= new_current_block_number) + return; + + std::cout << "Creating ephemeral sequential nodes:" << std::endl; + create_ephemeral_nodes(1); /// Firstly try to create just a single node. + + /// Create other nodes in batches of 50 nodes. + while (current_block_number + 50 <= new_current_block_number) + create_ephemeral_nodes(50); + + create_ephemeral_nodes(new_current_block_number - current_block_number); } @@ -230,9 +264,14 @@ try return 0; } + std::cout << "Required adjusting of " << part_paths_with_max_block_numbers.size() << " block numbers." << std::endl; + /// Adjust the block numbers. if (options.count("dry-run")) + { + std::cout << "This is a dry-run, exiting." << std::endl; return 0; + } std::cout << std::endl << "Adjusting the block numbers:" << std::endl; for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers) From 98b0d08bf3407d04bdb047439b8e582555e67cd3 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 19 Jul 2019 15:57:23 +0300 Subject: [PATCH 3/3] Added missing header #5981 --- dbms/src/Common/new_delete.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Common/new_delete.cpp b/dbms/src/Common/new_delete.cpp index d9140f9459d..aff708135e1 100644 --- a/dbms/src/Common/new_delete.cpp +++ b/dbms/src/Common/new_delete.cpp @@ -1,5 +1,6 @@ #include +#include #include #include