From e5d4149ca45ff85f903ec97c5d0fdf2d57f6fac9 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 28 Jan 2016 04:00:42 +0300 Subject: [PATCH] Merge --- dbms/include/DB/Common/BlockFilterCreator.h | 64 +++++ dbms/include/DB/Interpreters/Context.h | 1 + .../DB/Interpreters/InterpreterAlterQuery.h | 6 +- dbms/include/DB/Parsers/ASTAlterQuery.h | 2 +- .../DistributedBlockOutputStream.h | 218 +------------- dbms/include/DB/Storages/IStorage.h | 2 +- .../DB/Storages/MergeTree/MergeTreeSharder.h | 2 + .../MergeTree/RemoteDiskSpaceMonitor.h | 8 +- .../DB/Storages/MergeTree/ReshardingJob.h | 4 +- .../DB/Storages/MergeTree/ReshardingWorker.h | 20 +- .../MergeTree/ShardedPartitionSender.h | 5 +- dbms/include/DB/Storages/StorageDistributed.h | 2 +- .../DB/Storages/StorageReplicatedMergeTree.h | 4 +- dbms/src/Core/ErrorCodes.cpp | 13 +- dbms/src/Interpreters/Context.cpp | 15 +- .../Interpreters/InterpreterAlterQuery.cpp | 7 +- dbms/src/Parsers/ASTAlterQuery.cpp | 8 +- dbms/src/Parsers/ParserAlterQuery.cpp | 7 +- dbms/src/Server/Server.cpp | 11 +- .../DistributedBlockOutputStream.cpp | 180 ++++++++++++ .../Storages/MergeTree/MergeTreeSharder.cpp | 67 +---- .../MergeTree/RemoteDiskSpaceMonitor.cpp | 9 +- dbms/src/Storages/MergeTree/ReshardingJob.cpp | 28 +- .../Storages/MergeTree/ReshardingWorker.cpp | 271 ++++++++---------- .../MergeTree/ShardedPartitionSender.cpp | 21 +- dbms/src/Storages/StorageDistributed.cpp | 4 +- .../Storages/StorageReplicatedMergeTree.cpp | 76 +---- libs/libzkutil/include/zkutil/ZooKeeper.h | 5 +- libs/libzkutil/src/ZooKeeper.cpp | 15 +- 29 files changed, 517 insertions(+), 558 deletions(-) create mode 100644 dbms/include/DB/Common/BlockFilterCreator.h create mode 100644 dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp diff --git a/dbms/include/DB/Common/BlockFilterCreator.h b/dbms/include/DB/Common/BlockFilterCreator.h new file mode 100644 index 00000000000..aa17ec04953 --- /dev/null +++ b/dbms/include/DB/Common/BlockFilterCreator.h @@ -0,0 +1,64 @@ +#pragma once + +#include +#include + +#include + +#if defined(__x86_64__) + #define LIBDIVIDE_USE_SSE2 1 +#endif + +#include + +namespace DB +{ + +template +struct BlockFilterCreator +{ + static std::vector perform(const size_t num_rows, const IColumn * column, + size_t num_shards, const std::vector & slots) + { + const auto total_weight = slots.size(); + std::vector filters(num_shards); + + /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. + * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. + * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. + */ + using UnsignedT = typename std::make_unsigned::type; + + /// const columns contain only one value, therefore we do not need to read it at every iteration + if (column->isConst()) + { + const auto data = typeid_cast *>(column)->getData(); + const auto shard_num = slots[static_cast(data) % total_weight]; + + for (size_t i = 0; i < num_shards; ++i) + filters[i].assign(num_rows, static_cast(shard_num == i)); + } + else + { + /// libdivide поддерживает только UInt32 или UInt64. + using TUInt32Or64 = typename std::conditional::type; + + libdivide::divider divider(total_weight); + + const auto & data = typeid_cast *>(column)->getData(); + + /// NOTE Может быть, стоит поменять местами циклы. + for (size_t i = 0; i < num_shards; ++i) + { + filters[i].resize(num_rows); + for (size_t j = 0; j < num_rows; ++j) + filters[i][j] = slots[ + static_cast(data[j]) - (static_cast(data[j]) / divider) * total_weight] == i; + } + } + + return filters; + } +}; + +} diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index bd036ca9cc8..f298d5faeba 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -251,6 +251,7 @@ public: BackgroundProcessingPool & getBackgroundPool(); + void setReshardingWorker(std::shared_ptr resharding_worker); ReshardingWorker & getReshardingWorker(); /** Очистить кэши разжатых блоков и засечек. diff --git a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h index 15814622164..9e6417e5074 100644 --- a/dbms/include/DB/Interpreters/InterpreterAlterQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterAlterQuery.h @@ -55,7 +55,7 @@ private: /// Для RESHARD PARTITION. Field last_partition; WeightedZooKeeperPaths weighted_zookeeper_paths; - String sharding_key; + ASTPtr sharding_key_expr; static PartitionCommand dropPartition(const Field & partition, bool detach, bool unreplicated) { @@ -78,9 +78,9 @@ private: } static PartitionCommand reshardPartitions(const Field & first_partition_, const Field & last_partition_, - const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const String & sharding_key_) + const WeightedZooKeeperPaths & weighted_zookeeper_paths_, const ASTPtr & sharding_key_expr) { - return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_}; + return {RESHARD_PARTITION, first_partition_, false, false, false, {}, last_partition_, weighted_zookeeper_paths_, sharding_key_expr}; } }; diff --git a/dbms/include/DB/Parsers/ASTAlterQuery.h b/dbms/include/DB/Parsers/ASTAlterQuery.h index 4d6d9af30bf..eea05c73e12 100644 --- a/dbms/include/DB/Parsers/ASTAlterQuery.h +++ b/dbms/include/DB/Parsers/ASTAlterQuery.h @@ -70,7 +70,7 @@ public: */ ASTPtr last_partition; ASTPtr weighted_zookeeper_paths; - String sharding_key; + ASTPtr sharding_key_expr; /// deep copy void clone(Parameters & p) const; diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 2d6f20678f1..081b51eed4a 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -1,32 +1,14 @@ #pragma once -#include - #include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include -#include - -#if defined(__x86_64__) - #define LIBDIVIDE_USE_SSE2 1 -#endif - -#include - +#include +#include namespace DB { +class StorageDistributed; + /** Запись асинхронная - данные сначала записываются на локальную файловую систему, а потом отправляются на удалённые серверы. * Если Distributed таблица использует более одного шарда, то для того, чтобы поддерживалась запись, * при создании таблицы должен быть указан дополнительный параметр у ENGINE - ключ шардирования. @@ -38,198 +20,22 @@ namespace DB class DistributedBlockOutputStream : public IBlockOutputStream { public: - DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast) - : storage(storage), query_ast(query_ast) - { - } + DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast); - void write(const Block & block) override - { - if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1)) - return writeSplit(block); - - writeImpl(block); - } + void write(const Block & block) override; private: - template - static std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) - { - const auto total_weight = cluster.slot_to_shard.size(); - const auto num_shards = cluster.getShardsInfo().size(); - std::vector filters(num_shards); + std::vector createFilters(Block block); - /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. - * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. - * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. - */ - using UnsignedT = typename std::make_unsigned::type; + void writeSplit(const Block & block); - /// const columns contain only one value, therefore we do not need to read it at every iteration - if (column->isConst()) - { - const auto data = typeid_cast *>(column)->getData(); - const auto shard_num = cluster.slot_to_shard[static_cast(data) % total_weight]; + void writeImpl(const Block & block, const size_t shard_id = 0); - for (size_t i = 0; i < num_shards; ++i) - filters[i].assign(num_rows, static_cast(shard_num == i)); - } - else - { - /// libdivide поддерживает только UInt32 или UInt64. - using TUInt32Or64 = typename std::conditional::type; + void writeToLocal(const Block & block, const size_t repeats); - libdivide::divider divider(total_weight); - - const auto & data = typeid_cast *>(column)->getData(); - - /// NOTE Может быть, стоит поменять местами циклы. - for (size_t i = 0; i < num_shards; ++i) - { - filters[i].resize(num_rows); - for (size_t j = 0; j < num_rows; ++j) - filters[i][j] = cluster.slot_to_shard[ - static_cast(data[j]) - (static_cast(data[j]) / divider) * total_weight] == i; - } - } - - return filters; - } - - std::vector createFilters(Block block) - { - using create_filters_sig = std::vector(size_t, const IColumn *, const Cluster &); - /// hashmap of pointers to functions corresponding to each integral type - static std::unordered_map creators{ - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - }; - - storage.getShardingKeyExpr()->execute(block); - - const auto & key_column = block.getByName(storage.getShardingKeyColumnName()); - - /// check that key column has valid type - const auto it = creators.find(key_column.type->getName()); - - return it != std::end(creators) - ? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster) - : throw Exception{ - "Sharding key expression does not evaluate to an integer type", - ErrorCodes::TYPE_MISMATCH - }; - } - - void writeSplit(const Block & block) - { - const auto num_cols = block.columns(); - /// cache column pointers for later reuse - std::vector columns(num_cols); - for (size_t i = 0; i < columns.size(); ++i) - columns[i] = block.getByPosition(i).column; - - auto filters = createFilters(block); - - const auto num_shards = storage.cluster.getShardsInfo().size(); - - ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад. - - for (size_t i = 0; i < num_shards; ++i) - { - auto target_block = block.cloneEmpty(); - - for (size_t col = 0; col < num_cols; ++col) - target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint); - - if (target_block.rowsInFirstColumn()) - writeImpl(target_block, i); - } - } - - void writeImpl(const Block & block, const size_t shard_id = 0) - { - const auto & shard_info = storage.cluster.getShardsInfo()[shard_id]; - if (shard_info.getLocalNodeCount() > 0) - writeToLocal(block, shard_info.getLocalNodeCount()); - - /// dir_names is empty if shard has only local addresses - if (!shard_info.dir_names.empty()) - writeToShard(block, shard_info.dir_names); - } - - void writeToLocal(const Block & block, const size_t repeats) - { - InterpreterInsertQuery interp{query_ast, storage.context}; - - auto block_io = interp.execute(); - block_io.out->writePrefix(); - - for (size_t i = 0; i < repeats; ++i) - block_io.out->write(block); - - block_io.out->writeSuffix(); - } - - void writeToShard(const Block & block, const std::vector & dir_names) - { - /** tmp directory is used to ensure atomicity of transactions - * and keep monitor thread out from reading incomplete data - */ - std::string first_file_tmp_path{}; - - auto first = true; - const auto & query_string = queryToString(query_ast); - - /// write first file, hardlink the others - for (const auto & dir_name : dir_names) - { - const auto & path = storage.getPath() + dir_name + '/'; - - /// ensure shard subdirectory creation and notify storage - if (Poco::File(path).createDirectory()) - storage.requireDirectoryMonitor(dir_name); - - const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin"; - const auto & block_file_path = path + file_name; - - /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure - * the inode is not freed until we're done */ - if (first) - { - first = false; - - const auto & tmp_path = path + "tmp/"; - Poco::File(tmp_path).createDirectory(); - const auto & block_file_tmp_path = tmp_path + file_name; - - first_file_tmp_path = block_file_tmp_path; - - WriteBufferFromFile out{block_file_tmp_path}; - CompressedWriteBuffer compress{out}; - NativeBlockOutputStream stream{compress, Revision::get()}; - - writeStringBinary(query_string, out); - - stream.writePrefix(); - stream.write(block); - stream.writeSuffix(); - } - - if (link(first_file_tmp_path.data(), block_file_path.data())) - throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path); - } - - /** remove the temporary file, enabling the OS to reclaim inode after all threads - * have removed their corresponding files */ - Poco::File(first_file_tmp_path).remove(); - } + void writeToShard(const Block & block, const std::vector & dir_names); +private: StorageDistributed & storage; ASTPtr query_ast; }; diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 4505ba552a5..1f0c4eb40a9 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -242,7 +242,7 @@ public: /** Выполнить запрос RESHARD PARTITION. */ virtual void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, const Settings & settings) { throw Exception("Method reshardPartition is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h index f41a0492d74..7e422b5c4a8 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSharder.h @@ -56,6 +56,8 @@ private: const ReshardingJob & job; Logger * log; std::vector slots; + ExpressionActionsPtr sharding_key_expr; + std::string sharding_key_column_name; }; } diff --git a/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h b/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h index 45689a9f608..c6a2bed6189 100644 --- a/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h +++ b/dbms/include/DB/Storages/MergeTree/RemoteDiskSpaceMonitor.h @@ -7,6 +7,8 @@ namespace DB { +class Context; + namespace RemoteDiskSpaceMonitor { @@ -15,14 +17,14 @@ namespace RemoteDiskSpaceMonitor class Service final : public InterserverIOEndpoint { public: - Service(const std::string & path_); + Service(const Context & context_); Service(const Service &) = delete; Service & operator=(const Service &) = delete; std::string getId(const std::string & node_id) const override; void processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out) override; private: - const std::string path; + const Context & context; }; /** Клиент для получения информации о свободном месте на удалённом диске. @@ -33,7 +35,7 @@ public: Client() = default; Client(const Client &) = delete; Client & operator=(const Client &) = delete; - size_t getFreeDiskSpace(const InterserverIOEndpointLocation & location) const; + size_t getFreeSpace(const InterserverIOEndpointLocation & location) const; void cancel() { is_cancelled = true; } private: diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingJob.h b/dbms/include/DB/Storages/MergeTree/ReshardingJob.h index 927a27abef3..e22157620fc 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingJob.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingJob.h @@ -16,7 +16,7 @@ public: ReshardingJob(const std::string & database_name_, const std::string & table_name_, const std::string & partition_, const WeightedZooKeeperPaths & paths_, - const std::string & sharding_key_); + const ASTPtr & sharding_key_expr_); ReshardingJob(const ReshardingJob &) = delete; ReshardingJob & operator=(const ReshardingJob &) = delete; @@ -29,7 +29,7 @@ public: std::string table_name; std::string partition; WeightedZooKeeperPaths paths; - std::string sharding_key; + ASTPtr sharding_key_expr; }; } diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index 35874ae5a2f..984d116fed0 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -1,7 +1,9 @@ #pragma once +#include #include #include +#include #include #include #include @@ -22,7 +24,8 @@ class ReshardingJob; class ReshardingWorker final { public: - ReshardingWorker(Context & context_); + ReshardingWorker(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, Context & context_); ReshardingWorker(const ReshardingWorker &) = delete; ReshardingWorker & operator=(const ReshardingWorker &) = delete; @@ -37,18 +40,12 @@ public: const std::string & table_name, const std::string & partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const std::string & sharding_key); - - /// Прислать запрос на перешардирование. - void submitJob(const ReshardingJob & job); + const ASTPtr & sharding_key_expr); /// Был ли поток запущен? bool isStarted() const; private: - /// Прислать запрос на перешардирование (внутренняя версия). - void submitJobImpl(const std::string & serialized_job); - /// Следить за появлением новых задач. Выполнить их последовательно. void pollAndExecute(); @@ -81,18 +78,17 @@ private: /// Принудительно завершить поток. void abortIfRequested() const; - /// Был ли поток завершён? - bool hasAborted(const Exception & ex) const; - private: Context & context; Logger * log; + std::unique_ptr merger; std::thread polling_thread; + mutable std::mutex cancel_mutex; std::string host_task_queue_path; std::atomic is_started{false}; std::atomic must_stop{false}; }; -using ReshardingWorkerPtr = Poco::SharedPtr; +using ReshardingWorkerPtr = std::shared_ptr; } diff --git a/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h b/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h index 1f345b9a721..bff2d5a20c0 100644 --- a/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h +++ b/dbms/include/DB/Storages/MergeTree/ShardedPartitionSender.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -24,6 +25,7 @@ public: private: StorageReplicatedMergeTree & storage; + Logger * log; }; /** Клиент для отправления кусков из партиции таблицы *MergeTree. @@ -31,7 +33,7 @@ private: class Client final { public: - Client() = default; + Client(); Client(const Client &) = delete; Client & operator=(const Client &) = delete; bool send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location, @@ -40,6 +42,7 @@ public: private: std::atomic is_cancelled{false}; + Logger * log; }; } diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 8a031688f33..d7d9676dad1 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -77,7 +77,7 @@ public: void shutdown() override; void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, const Settings & settings) override; /// От каждой реплики получить описание соответствующей локальной таблицы. diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 5b37efadf19..b69da7d0f45 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -136,7 +136,7 @@ public: void fetchPartition(const Field & partition, const String & from, const Settings & settings) override; void freezePartition(const Field & partition, const Settings & settings) override; void reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, const Settings & settings) override; /** Удаляет реплику из ZooKeeper. Если других реплик нет, удаляет всю таблицу из ZooKeeper. @@ -257,7 +257,7 @@ private: MergeTreeDataMerger merger; DataPartsExchange::Fetcher fetcher; - RemoteDiskSpaceMonitor::Client free_disk_space_checker; + RemoteDiskSpaceMonitor::Client disk_space_monitor_client; ShardedPartitionSender::Client sharded_partition_sender_client; RemoteQueryExecutor::Client remote_query_executor_client; diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index d261b500282..6bf7a8fb712 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -317,13 +317,12 @@ namespace ErrorCodes extern const int INSUFFICIENT_SPACE_FOR_RESHARDING = 311; extern const int PARTITION_COPY_FAILED = 312; extern const int PARTITION_ATTACH_FAILED = 313; - extern const int RESHARDING_CLEANUP_FAILED = 314; - extern const int RESHARDING_NO_WORKER = 315; - extern const int INVALID_PARTITIONS_INTERVAL = 316; - extern const int RESHARDING_INVALID_PARAMETERS = 317; - extern const int INVALID_SHARD_WEIGHT = 318; - extern const int SHARD_DOESNT_REFERENCE_TABLE = 319; - extern const int UNKNOWN_STATUS_OF_INSERT = 320; + extern const int RESHARDING_NO_WORKER = 314; + extern const int INVALID_PARTITIONS_INTERVAL = 315; + extern const int RESHARDING_INVALID_PARAMETERS = 316; + extern const int INVALID_SHARD_WEIGHT = 317; + extern const int INVALID_CONFIG_PARAMETER = 318; + extern const int UNKNOWN_STATUS_OF_INSERT = 319; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index cbcb2cc5fd3..c66d21b5ef8 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -822,16 +822,19 @@ BackgroundProcessingPool & Context::getBackgroundPool() return *shared->background_pool; } +void Context::setReshardingWorker(std::shared_ptr resharding_worker) +{ + Poco::ScopedLock lock(shared->mutex); + if (shared->resharding_worker) + throw Exception("Resharding background thread has already been set.", ErrorCodes::LOGICAL_ERROR); + shared->resharding_worker = resharding_worker; +} + ReshardingWorker & Context::getReshardingWorker() { Poco::ScopedLock lock(shared->mutex); - - if (!shared->zookeeper) - throw Exception("Resharding background processing requires ZooKeeper", ErrorCodes::LOGICAL_ERROR); - if (!shared->resharding_worker) - shared->resharding_worker = new ReshardingWorker(*this); - + throw Exception("Resharding background thread not set.", ErrorCodes::LOGICAL_ERROR); return *shared->resharding_worker; } diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index c91f74d97f0..8602495420a 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -67,7 +67,7 @@ BlockIO InterpreterAlterQuery::execute() break; case PartitionCommand::RESHARD_PARTITION: - table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key, context.getSettingsRef()); + table->reshardPartitions(database_name, command.partition, command.last_partition, command.weighted_zookeeper_paths, command.sharding_key_expr, context.getSettingsRef()); break; default: @@ -190,9 +190,8 @@ void InterpreterAlterQuery::parseAlter( weighted_zookeeper_paths.emplace_back(weighted_zookeeper_path.path, weighted_zookeeper_path.weight); } - const auto & sharding_key = params.sharding_key; - - out_partition_commands.push_back(PartitionCommand::reshardPartitions(first_partition, last_partition, weighted_zookeeper_paths, sharding_key)); + out_partition_commands.push_back(PartitionCommand::reshardPartitions( + first_partition, last_partition, weighted_zookeeper_paths, params.sharding_key_expr)); } else throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR); diff --git a/dbms/src/Parsers/ASTAlterQuery.cpp b/dbms/src/Parsers/ASTAlterQuery.cpp index 54df2d547af..c490fcec367 100644 --- a/dbms/src/Parsers/ASTAlterQuery.cpp +++ b/dbms/src/Parsers/ASTAlterQuery.cpp @@ -19,6 +19,7 @@ void ASTAlterQuery::Parameters::clone(Parameters & p) const if (partition) p.partition = partition->clone(); if (last_partition) p.last_partition = last_partition->clone(); if (weighted_zookeeper_paths) p.weighted_zookeeper_paths = weighted_zookeeper_paths->clone(); + if (sharding_key_expr) p.sharding_key_expr = sharding_key_expr->clone(); } void ASTAlterQuery::addParameters(const Parameters & params) @@ -34,6 +35,8 @@ void ASTAlterQuery::addParameters(const Parameters & params) children.push_back(params.last_partition); if (params.weighted_zookeeper_paths) children.push_back(params.weighted_zookeeper_paths); + if (params.sharding_key_expr) + children.push_back(params.sharding_key_expr); } ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_) @@ -153,8 +156,9 @@ void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & st settings.ostr << settings.nl_or_ws; settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str - << "USING " << (settings.hilite ? hilite_none : "") - << p.sharding_key; + << "USING " << (settings.hilite ? hilite_none : ""); + + p.sharding_key_expr->formatImpl(settings, state, frame); } else throw Exception("Unexpected type of ALTER", ErrorCodes::UNEXPECTED_AST_STRUCTURE); diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index 7db11eb8a3d..d0910e418c5 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -255,7 +255,7 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa else if (s_reshard.ignore(pos, end, max_parsed_pos, expected)) { ParserList weighted_zookeeper_paths_p(ParserPtr(new ParserWeightedZooKeeperPath), ParserPtr(new ParserString(",")), false); - ParserIdentifier sharding_key_parser; + ParserExpressionWithOptionalAlias parser_sharding_key_expr(false); ws.ignore(pos, end); @@ -294,12 +294,9 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa ws.ignore(pos, end); - ASTPtr ast_sharding_key; - if (!sharding_key_parser.parse(pos, end, ast_sharding_key, max_parsed_pos, expected)) + if (!parser_sharding_key_expr.parse(pos, end, params.sharding_key_expr, max_parsed_pos, expected)) return false; - params.sharding_key = typeid_cast(*ast_sharding_key).name; - ws.ignore(pos, end); params.type = ASTAlterQuery::RESHARD_PARTITION; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index d9d107cd600..fabf16a0d84 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -320,14 +320,11 @@ int Server::main(const std::vector & args) global_context->setCurrentDatabase(config().getString("default_database", "default")); - if (has_zookeeper) + if (has_zookeeper && config().has("resharding")) { - zkutil::ZooKeeperPtr zookeeper = global_context->getZooKeeper(); - if (!zookeeper->getTaskQueuePath().empty()) - { - auto & resharding_worker = global_context->getReshardingWorker(); - resharding_worker.start(); - } + auto resharding_worker = std::make_shared(config(), "resharding", *global_context); + global_context->setReshardingWorker(resharding_worker); + resharding_worker->start(); } SCOPE_EXIT( diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp new file mode 100644 index 00000000000..cfc984b280a --- /dev/null +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -0,0 +1,180 @@ +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +namespace DB +{ + +namespace +{ + +template +std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) +{ + return BlockFilterCreator::perform(num_rows, column, cluster.getShardsInfo().size(), cluster.slot_to_shard); +} + +} + +DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast) + : storage(storage), query_ast(query_ast) +{ +} + +void DistributedBlockOutputStream::write(const Block & block) +{ + if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1)) + return writeSplit(block); + + writeImpl(block); +} + +std::vector DistributedBlockOutputStream::createFilters(Block block) +{ + using create_filters_sig = std::vector(size_t, const IColumn *, const Cluster &); + /// hashmap of pointers to functions corresponding to each integral type + static std::unordered_map creators{ + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &createFiltersImpl }, + }; + + storage.getShardingKeyExpr()->execute(block); + + const auto & key_column = block.getByName(storage.getShardingKeyColumnName()); + + /// check that key column has valid type + const auto it = creators.find(key_column.type->getName()); + + return it != std::end(creators) + ? (*it->second)(block.rowsInFirstColumn(), key_column.column.get(), storage.cluster) + : throw Exception{ + "Sharding key expression does not evaluate to an integer type", + ErrorCodes::TYPE_MISMATCH + }; +} + +void DistributedBlockOutputStream::writeSplit(const Block & block) +{ + const auto num_cols = block.columns(); + /// cache column pointers for later reuse + std::vector columns(num_cols); + for (size_t i = 0; i < columns.size(); ++i) + columns[i] = block.getByPosition(i).column; + + auto filters = createFilters(block); + + const auto num_shards = storage.cluster.getShardsInfo().size(); + + ssize_t size_hint = ((block.rowsInFirstColumn() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад. + + for (size_t i = 0; i < num_shards; ++i) + { + auto target_block = block.cloneEmpty(); + + for (size_t col = 0; col < num_cols; ++col) + target_block.getByPosition(col).column = columns[col]->filter(filters[i], size_hint); + + if (target_block.rowsInFirstColumn()) + writeImpl(target_block, i); + } +} + +void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id) +{ + const auto & shard_info = storage.cluster.getShardsInfo()[shard_id]; + if (shard_info.getLocalNodeCount() > 0) + writeToLocal(block, shard_info.getLocalNodeCount()); + + /// dir_names is empty if shard has only local addresses + if (!shard_info.dir_names.empty()) + writeToShard(block, shard_info.dir_names); +} + +void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats) +{ + InterpreterInsertQuery interp{query_ast, storage.context}; + + auto block_io = interp.execute(); + block_io.out->writePrefix(); + + for (size_t i = 0; i < repeats; ++i) + block_io.out->write(block); + + block_io.out->writeSuffix(); +} + +void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector & dir_names) +{ + /** tmp directory is used to ensure atomicity of transactions + * and keep monitor thread out from reading incomplete data + */ + std::string first_file_tmp_path{}; + + auto first = true; + const auto & query_string = queryToString(query_ast); + + /// write first file, hardlink the others + for (const auto & dir_name : dir_names) + { + const auto & path = storage.getPath() + dir_name + '/'; + + /// ensure shard subdirectory creation and notify storage + if (Poco::File(path).createDirectory()) + storage.requireDirectoryMonitor(dir_name); + + const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin"; + const auto & block_file_path = path + file_name; + + /** on first iteration write block to a temporary directory for subsequent hardlinking to ensure + * the inode is not freed until we're done */ + if (first) + { + first = false; + + const auto & tmp_path = path + "tmp/"; + Poco::File(tmp_path).createDirectory(); + const auto & block_file_tmp_path = tmp_path + file_name; + + first_file_tmp_path = block_file_tmp_path; + + WriteBufferFromFile out{block_file_tmp_path}; + CompressedWriteBuffer compress{out}; + NativeBlockOutputStream stream{compress, Revision::get()}; + + writeStringBinary(query_string, out); + + stream.writePrefix(); + stream.write(block); + stream.writeSuffix(); + } + + if (link(first_file_tmp_path.data(), block_file_path.data())) + throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path); + } + + /** remove the temporary file, enabling the OS to reclaim inode after all threads + * have removed their corresponding files */ + Poco::File(first_file_tmp_path).remove(); +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp b/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp index 2b42baa7057..8483c8336f8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSharder.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include @@ -16,47 +18,6 @@ namespace ErrorCodes extern const int TYPE_MISMATCH; } -namespace -{ - -template -std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, size_t num_shards, const std::vector & slots) -{ - const auto total_weight = slots.size(); - std::vector filters(num_shards); - - /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. - * Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые. - * Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи. - */ - using UnsignedT = typename std::make_unsigned::type; - - /// const columns contain only one value, therefore we do not need to read it at every iteration - if (column->isConst()) - { - const auto data = typeid_cast *>(column)->getData(); - const auto shard_num = slots[static_cast(data) % total_weight]; - - for (size_t i = 0; i < num_shards; ++i) - filters[i].assign(num_rows, static_cast(shard_num == i)); - } - else - { - const auto & data = typeid_cast *>(column)->getData(); - - for (size_t i = 0; i < num_shards; ++i) - { - filters[i].resize(num_rows); - for (size_t j = 0; j < num_rows; ++j) - filters[i][j] = slots[static_cast(data[j]) % total_weight] == i; - } - } - - return filters; -} - -} - ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_, size_t shard_no_, UInt16 min_date_, UInt16 max_date_) : block(block_), shard_no(shard_no_), min_date(min_date_), max_date(max_date_) @@ -64,7 +25,9 @@ ShardedBlockWithDateInterval::ShardedBlockWithDateInterval(const Block & block_, } MergeTreeSharder::MergeTreeSharder(MergeTreeData & data_, const ReshardingJob & job_) - : data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)")) + : data(data_), job(job_), log(&Logger::get(data.getLogName() + " (Sharder)")), + sharding_key_expr(ExpressionAnalyzer(job.sharding_key_expr, data.context, nullptr, data.getColumnsList()).getActions(false)), + sharding_key_column_name(job.sharding_key_expr->getColumnName()) { for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no) { @@ -201,19 +164,19 @@ std::vector MergeTreeSharder::createFilters(Block block) using create_filters_sig = std::vector(size_t, const IColumn *, size_t num_shards, const std::vector & slots); /// hashmap of pointers to functions corresponding to each integral type static std::unordered_map creators{ - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, - { TypeName::get(), &createFiltersImpl }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, + { TypeName::get(), &BlockFilterCreator::perform }, }; - data.getPrimaryExpression()->execute(block); + sharding_key_expr->execute(block); - const auto & key_column = block.getByName(job.sharding_key); + const auto & key_column = block.getByName(sharding_key_column_name); /// check that key column has valid type const auto it = creators.find(key_column.type->getName()); diff --git a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp index 8a3392a6c81..382261d9cbd 100644 --- a/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/RemoteDiskSpaceMonitor.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -25,8 +26,8 @@ std::string getEndpointId(const std::string & node_id) } -Service::Service(const String & path_) - : path(path_) +Service::Service(const Context & context_) + : context(context_) { } @@ -40,12 +41,12 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out if (is_cancelled) throw Exception("RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED); - size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); + size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath()); writeBinary(free_space, out); out.next(); } -size_t Client::getFreeDiskSpace(const InterserverIOEndpointLocation & location) const +size_t Client::getFreeSpace(const InterserverIOEndpointLocation & location) const { ReadBufferFromHTTP::Params params = { diff --git a/dbms/src/Storages/MergeTree/ReshardingJob.cpp b/dbms/src/Storages/MergeTree/ReshardingJob.cpp index 9effb2e233e..fa2a23fb5ae 100644 --- a/dbms/src/Storages/MergeTree/ReshardingJob.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingJob.cpp @@ -3,10 +3,17 @@ #include #include #include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + ReshardingJob::ReshardingJob(const std::string & serialized_job) { ReadBufferFromString buf(serialized_job); @@ -14,7 +21,19 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job) readBinary(database_name, buf); readBinary(table_name, buf); readBinary(partition, buf); - readBinary(sharding_key, buf); + + std::string expr; + readBinary(expr, buf); + + IParser::Pos pos = expr.data(); + IParser::Pos max_parsed_pos = pos; + const char * end = pos + expr.size(); + + ParserExpressionWithOptionalAlias parser(false); + Expected expected = ""; + if (!parser.parse(pos, end, sharding_key_expr, max_parsed_pos, expected)) + throw Exception("ReshardingJob: Internal error", ErrorCodes::LOGICAL_ERROR); + while (!buf.eof()) { std::string path; @@ -29,12 +48,12 @@ ReshardingJob::ReshardingJob(const std::string & serialized_job) ReshardingJob::ReshardingJob(const std::string & database_name_, const std::string & table_name_, const std::string & partition_, const WeightedZooKeeperPaths & paths_, - const std::string & sharding_key_) + const ASTPtr & sharding_key_expr_) : database_name(database_name_), table_name(table_name_), partition(partition_), paths(paths_), - sharding_key(sharding_key_) + sharding_key_expr(sharding_key_expr_) { } @@ -46,7 +65,8 @@ std::string ReshardingJob::toString() const writeBinary(database_name, buf); writeBinary(table_name, buf); writeBinary(partition, buf); - writeBinary(sharding_key, buf); + writeBinary(queryToString(sharding_key_expr), buf); + for (const auto & path : paths) { writeBinary(path.first, buf); diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 02ef39f6035..4d1b1c8b3e5 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -30,7 +31,8 @@ namespace ErrorCodes extern const int UNEXPECTED_ZOOKEEPER_ERROR; extern const int PARTITION_COPY_FAILED; extern const int PARTITION_ATTACH_FAILED; - extern const int RESHARDING_CLEANUP_FAILED; + extern const int UNKNOWN_ELEMENT_IN_CONFIG; + extern const int INVALID_CONFIG_PARAMETER; } namespace @@ -52,17 +54,52 @@ std::string createMergedPartName(const MergeTreeData::DataPartsVector & parts) return ActiveDataPartSet::getPartName(left_date, right_date, parts.front()->left, parts.back()->right, level + 1); } +class Arguments final +{ +public: + Arguments(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) + { + Poco::Util::AbstractConfiguration::Keys keys; + config.keys(config_name, keys); + for (const auto & key : keys) + { + if (key == "task_queue_path") + { + task_queue_path = config.getString(config_name + "." + key); + if (task_queue_path.empty()) + throw Exception("Invalid parameter in resharding configuration", ErrorCodes::INVALID_CONFIG_PARAMETER); + } + else + throw Exception("Unknown parameter in resharding configuration", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + } + } + + Arguments(const Arguments &) = delete; + Arguments & operator=(const Arguments &) = delete; + + std::string getTaskQueuePath() const + { + return task_queue_path; + } + +private: + std::string task_queue_path; +}; + } -ReshardingWorker::ReshardingWorker(Context & context_) +ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & config, + const std::string & config_name, Context & context_) : context(context_), log(&Logger::get("ReshardingWorker")) { + Arguments arguments(config, config_name); + auto zookeeper = context.getZooKeeper(); host_task_queue_path = "/clickhouse"; zookeeper->createIfNotExists(host_task_queue_path, ""); - host_task_queue_path += "/" + zookeeper->getTaskQueuePath(); + host_task_queue_path += "/" + arguments.getTaskQueuePath(); zookeeper->createIfNotExists(host_task_queue_path, ""); host_task_queue_path += "/resharding"; @@ -75,6 +112,11 @@ ReshardingWorker::ReshardingWorker(Context & context_) ReshardingWorker::~ReshardingWorker() { must_stop = true; + { + std::lock_guard guard(cancel_mutex); + if (merger) + merger->cancel(); + } if (polling_thread.joinable()) polling_thread.join(); } @@ -88,16 +130,12 @@ void ReshardingWorker::submitJob(const std::string & database_name, const std::string & table_name, const std::string & partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const std::string & sharding_key) + const ASTPtr & sharding_key_expr) { - auto str = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key).toString(); - submitJobImpl(str); -} - -void ReshardingWorker::submitJob(const ReshardingJob & job) -{ - auto str = job.toString(); - submitJobImpl(str); + auto serialized_job = ReshardingJob(database_name, table_name, partition, weighted_zookeeper_paths, sharding_key_expr).toString(); + auto zookeeper = context.getZooKeeper(); + (void) zookeeper->create(host_task_queue_path + "/task-", serialized_job, + zkutil::CreateMode::PersistentSequential); } bool ReshardingWorker::isStarted() const @@ -105,23 +143,18 @@ bool ReshardingWorker::isStarted() const return is_started; } -void ReshardingWorker::submitJobImpl(const std::string & serialized_job) -{ - auto zookeeper = context.getZooKeeper(); - (void) zookeeper->create(host_task_queue_path + "/task-", serialized_job, - zkutil::CreateMode::PersistentSequential); -} - void ReshardingWorker::pollAndExecute() { + bool error = false; + try { bool old_val = false; if (!is_started.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed)) - throw Exception("Resharding worker thread already started", ErrorCodes::LOGICAL_ERROR); + throw Exception("Resharding background thread already started", ErrorCodes::LOGICAL_ERROR); - LOG_DEBUG(log, "Started resharding thread."); + LOG_DEBUG(log, "Started resharding background thread."); try { @@ -129,10 +162,10 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex)) + if (ex.code() == ErrorCodes::ABORTED) throw; else - LOG_INFO(log, ex.message()); + LOG_ERROR(log, ex.message()); } catch (...) { @@ -166,10 +199,10 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if ((ex.code() == ErrorCodes::RESHARDING_CLEANUP_FAILED) || hasAborted(ex)) + if (ex.code() == ErrorCodes::ABORTED) throw; else - LOG_INFO(log, ex.message()); + LOG_ERROR(log, ex.message()); } catch (...) { @@ -179,11 +212,21 @@ void ReshardingWorker::pollAndExecute() } catch (const Exception & ex) { - if (!hasAborted(ex)) - throw; + if (ex.code() != ErrorCodes::ABORTED) + error = true; + } + catch (...) + { + error = true; } - LOG_DEBUG(log, "Resharding thread terminated."); + if (error) + { + /// Если мы попали сюда, это значит, что где-то кроется баг. + LOG_ERROR(log, "Resharding background thread terminated with critical error."); + } + else + LOG_DEBUG(log, "Resharding background thread terminated."); } void ReshardingWorker::performPendingJobs() @@ -204,8 +247,24 @@ void ReshardingWorker::perform(const Strings & job_nodes) std::string child_full_path = host_task_queue_path + "/" + child; auto job_descriptor = zookeeper->get(child_full_path); ReshardingJob job(job_descriptor); + + try + { + perform(job); + } + catch (const Exception & ex) + { + if (ex.code() != ErrorCodes::ABORTED) + zookeeper->remove(child_full_path); + throw; + } + catch (...) + { + zookeeper->remove(child_full_path); + throw; + } + zookeeper->remove(child_full_path); - perform(job); } } @@ -229,12 +288,8 @@ void ReshardingWorker::perform(const ReshardingJob & job) { cleanup(storage, job); - if (hasAborted(ex)) - { - /// Поток завершается. Сохраняем сведения о прерванной задаче. - submitJob(job); - LOG_DEBUG(log, "Resharding job cancelled then re-submitted for later processing."); - } + if (ex.code() == ErrorCodes::ABORTED) + LOG_DEBUG(log, "Resharding job cancelled."); throw; } @@ -276,15 +331,24 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor /// Для каждого шарда, куски, которые должны быть слиты. std::unordered_map to_merge; + /// Для нумерации блоков. + SimpleIncrement increment(storage.data.getMaxDataPartIndex()); + MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts; auto zookeeper = storage.getZooKeeper(); const auto & settings = context.getSettingsRef(); - (void) settings; DayNum_t month = MergeTreeData::getMonthFromName(job.partition); - auto parts_from_partition = storage.merger.selectAllPartsFromPartition(month); + { + std::lock_guard guard(cancel_mutex); + merger = std::make_unique(storage.data); + } + + auto parts_from_partition = merger->selectAllPartsFromPartition(month); + + MergeTreeSharder sharder(storage.data, job); for (const auto & part : parts_from_partition) { @@ -305,8 +369,6 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor DBMS_DEFAULT_BUFFER_SIZE, true); - MergeTreeSharder sharder(storage.data, job); - Block block; while (block = source.read()) { @@ -318,57 +380,8 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor abortIfRequested(); /// Создать новый кусок соответствующий новому блоку. - std::string month_name = toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(block_with_dates.min_date)) / 100); - AbandonableLockInZooKeeper block_number_lock = storage.allocateBlockNumber(month_name); - Int64 part_number = block_number_lock.getNumber(); - MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, part_number); - - /// Добавить в БД ZooKeeper информацию о новом блоке. - SipHash hash; - block_part->checksums.summaryDataChecksum(hash); - union - { - char bytes[16]; - UInt64 lo; - UInt64 hi; - } hash_value; - hash.get128(hash_value.bytes); - - std::string checksum(hash_value.bytes, 16); - - std::string block_id = toString(hash_value.lo) + "_" + toString(hash_value.hi); - - zkutil::Ops ops; - auto acl = zookeeper->getDefaultACL(); - - std::string to_path = job.paths[block_with_dates.shard_no].first; - - ops.push_back( - new zkutil::Op::Create( - to_path + "/detached_sharded_blocks/" + block_id, - "", - acl, - zkutil::CreateMode::Persistent)); - ops.push_back( - new zkutil::Op::Create( - to_path + "/detached_sharded_blocks/" + block_id + "/checksum", - checksum, - acl, - zkutil::CreateMode::Persistent)); - ops.push_back( - new zkutil::Op::Create( - to_path + "/detached_sharded_blocks/" + block_id + "/number", - toString(part_number), - acl, - zkutil::CreateMode::Persistent)); - - block_number_lock.getUnlockOps(ops); - - auto code = zookeeper->tryMulti(ops); - if (code != ZOK) - throw Exception("Unexpected error while adding block " + toString(part_number) - + " with ID " + block_id + ": " + zkutil::ZooKeeper::error2string(code), - ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR); + Int64 temp_index = increment.get(); + MergeTreeData::MutableDataPartPtr block_part = sharder.writeTempPart(block_with_dates, temp_index); abortIfRequested(); @@ -390,7 +403,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name, job.table_name, merged_name); - MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry, + MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry, storage.data.context.getSettings().min_bytes_to_use_direct_io); sharded_parts.insert(new_part); @@ -424,7 +437,7 @@ void ReshardingWorker::createShardedPartitions(StorageReplicatedMergeTree & stor const auto & merge_entry = storage.data.context.getMergeList().insert(job.database_name, job.table_name, merged_name); - MergeTreeData::MutableDataPartPtr new_part = storage.merger.mergeParts(parts, merged_name, *merge_entry, + MergeTreeData::MutableDataPartPtr new_part = merger->mergeParts(parts, merged_name, *merge_entry, storage.data.context.getSettings().min_bytes_to_use_direct_io); sharded_parts.insert(new_part); @@ -466,8 +479,6 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto auto zookeeper = storage.getZooKeeper(); - MergeTreeData::PerShardDataParts & per_shard_data_parts = storage.data.per_shard_data_parts; - struct TaskInfo { TaskInfo(const std::string & replica_path_, @@ -493,16 +504,20 @@ void ReshardingWorker::publishShardedPartitions(StorageReplicatedMergeTree & sto /// Количество участвующих локальных реплик. Должно быть <= 1. size_t local_count = 0; - for (size_t shard_no = 0; shard_no < job.paths.size(); ++shard_no) + for (const auto & entry : storage.data.per_shard_data_parts) { - const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; - const std::string & zookeeper_path = weighted_path.first; + size_t shard_no = entry.first; + const MergeTreeData::MutableDataParts & sharded_parts = entry.second; + if (sharded_parts.empty()) + continue; std::vector part_names; - const MergeTreeData::MutableDataParts & sharded_parts = per_shard_data_parts.at(shard_no); for (const MergeTreeData::DataPartPtr & sharded_part : sharded_parts) part_names.push_back(sharded_part->name); + const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; + const std::string & zookeeper_path = weighted_path.first; + auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); for (const auto & child : children) { @@ -610,9 +625,14 @@ void ReshardingWorker::applyChanges(StorageReplicatedMergeTree & storage, const using TaskInfoList = std::vector; TaskInfoList task_info_list; - for (size_t i = 0; i < job.paths.size(); ++i) + for (const auto & entry : storage.data.per_shard_data_parts) { - const WeightedZooKeeperPath & weighted_path = job.paths[i]; + size_t shard_no = entry.first; + const MergeTreeData::MutableDataParts & sharded_parts = entry.second; + if (sharded_parts.empty()) + continue; + + const WeightedZooKeeperPath & weighted_path = job.paths[shard_no]; const std::string & zookeeper_path = weighted_path.first; auto children = zookeeper->getChildren(zookeeper_path + "/replicas"); @@ -668,47 +688,13 @@ void ReshardingWorker::cleanup(StorageReplicatedMergeTree & storage, const Resha { LOG_DEBUG(log, "Performing cleanup."); - try + storage.data.per_shard_data_parts.clear(); + + Poco::DirectoryIterator end; + for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it) { - storage.data.per_shard_data_parts.clear(); - - Poco::DirectoryIterator end; - for (Poco::DirectoryIterator it(storage.full_path + "/reshard"); it != end; ++it) - { - auto absolute_path = it.path().absolute().toString(); - Poco::File(absolute_path).remove(true); - } - - auto zookeeper = storage.getZooKeeper(); - zkutil::Ops ops; - for (size_t i = 0; i < job.paths.size(); ++i) - { - const WeightedZooKeeperPath & weighted_path = job.paths[i]; - const std::string & zookeeper_path = weighted_path.first; - - auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks"); - if (!children.empty()) - { - for (const auto & child : children) - { - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1)); - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1)); - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child, -1)); - } - } - } - zookeeper->multi(ops); - } - catch (...) - { - throw Exception("Failed to perform cleanup during resharding operation", - ErrorCodes::RESHARDING_CLEANUP_FAILED); + auto absolute_path = it.path().absolute().toString(); + Poco::File(absolute_path).remove(true); } } @@ -718,9 +704,4 @@ void ReshardingWorker::abortIfRequested() const throw Exception("Cancelled resharding", ErrorCodes::ABORTED); } -bool ReshardingWorker::hasAborted(const Exception & ex) const -{ - return must_stop && (ex.code() == ErrorCodes::ABORTED); -} - } diff --git a/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp b/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp index 5dbcf664633..9ce5a36366c 100644 --- a/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp +++ b/dbms/src/Storages/MergeTree/ShardedPartitionSender.cpp @@ -51,7 +51,7 @@ std::string getEndpointId(const std::string & node_id) } Service::Service(StorageReplicatedMergeTree & storage_) - : storage(storage_) + : storage(storage_), log(&Logger::get("ShardedPartitionSender::Service")) { } @@ -79,8 +79,18 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out MergeTreeData::MutableDataPartPtr part = storage.fetcher.fetchShardedPart(from_location, part_name, shard_no); part->is_temp = false; - const std::string new_name = "detached/" + part_name; - Poco::File(storage.full_path + part->name).renameTo(storage.full_path + new_name); + + const std::string old_part_path = storage.full_path + part->name; + const std::string new_part_path = storage.full_path + "detached/" + part_name; + + Poco::File new_part_dir(new_part_path); + if (new_part_dir.exists()) + { + LOG_WARNING(log, "Directory " + new_part_path + " already exists. Removing."); + new_part_dir.remove(true); + } + + Poco::File(old_part_path).renameTo(new_part_path); } bool flag = true; @@ -88,6 +98,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, WriteBuffer & out out.next(); } +Client::Client() + : log(&Logger::get("ShardedPartitionSender::Client")) +{ +} + bool Client::send(const InterserverIOEndpointLocation & to_location, const InterserverIOEndpointLocation & from_location, const std::vector & parts, size_t shard_no) { diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 96cba49df43..622a8b6eadb 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -228,7 +228,7 @@ void StorageDistributed::shutdown() void StorageDistributed::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, const WeightedZooKeeperPaths & weighted_zookeeper_paths, - const String & sharding_key, const Settings & settings) + const ASTPtr & sharding_key_expr, const Settings & settings) { /// Создать запрос ALTER TABLE xxx.yyy RESHARD PARTITION zzz TO ttt USING uuu. @@ -258,7 +258,7 @@ void StorageDistributed::reshardPartitions(const String & database_name, const F } parameters.weighted_zookeeper_paths = expr_list; - parameters.sharding_key = sharding_key; + parameters.sharding_key_expr = sharding_key_expr; /** Функциональность shard_multiplexing не доделана - выключаем её. * (Потому что установка соединений с разными шардами в рамках одного потока выполняется не параллельно.) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index fd7a1e2ec73..59c9b7cf27c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -75,7 +75,6 @@ namespace ErrorCodes extern const int INVALID_PARTITIONS_INTERVAL; extern const int RESHARDING_INVALID_PARAMETERS; extern const int INVALID_SHARD_WEIGHT; - extern const int SHARD_DOESNT_REFERENCE_TABLE; } @@ -335,7 +334,7 @@ StoragePtr StorageReplicatedMergeTree::create( /// Сервисы для перешардирования. { - InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->full_path); + InterserverIOEndpointPtr endpoint = new RemoteDiskSpaceMonitor::Service(res->context); res->disk_space_monitor_endpoint_holder = get_endpoint_holder(endpoint); } @@ -403,8 +402,6 @@ void StorageReplicatedMergeTree::createTableIfNotExists() acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "", acl, zkutil::CreateMode::Persistent)); - ops.push_back(new zkutil::Op::Create(zookeeper_path + "/detached_sharded_blocks", "", - acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/block_numbers", "", acl, zkutil::CreateMode::Persistent)); ops.push_back(new zkutil::Op::Create(zookeeper_path + "/nonincrement_block_numbers", "", @@ -2317,7 +2314,7 @@ void StorageReplicatedMergeTree::shutdown() fetcher.cancel(); disk_space_monitor_endpoint_holder = nullptr; - free_disk_space_checker.cancel(); + disk_space_monitor_client.cancel(); sharded_partition_sender_endpoint_holder = nullptr; sharded_partition_sender_client.cancel(); @@ -2891,56 +2888,7 @@ void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & fie zookeeper_path + "/log/log-", entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); } - std::string log_msg = "Adding attaches to log"; - - if (is_leader_node) - { - /// Если ATTACH PART выполняется в рамках перешардирования, обновляем информацию о блоках на шарде. - auto children = zookeeper->getChildren(zookeeper_path + "/detached_sharded_blocks"); - if (!children.empty()) - { - log_msg += ". Updating information about blocks in the context of the resharding operation."; - - auto acl = zookeeper->getDefaultACL(); - - for (const auto & child : children) - { - std::string checksum = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum"); - std::string number = zookeeper->get(zookeeper_path + "/detached_sharded_blocks/" + child + "/number"); - - ops.push_back( - new zkutil::Op::Create( - zookeeper_path + "/blocks/" + child, - "", - acl, - zkutil::CreateMode::Persistent)); - ops.push_back( - new zkutil::Op::Create( - zookeeper_path + "/blocks/" + child + "/checksum", - checksum, - acl, - zkutil::CreateMode::Persistent)); - ops.push_back( - new zkutil::Op::Create( - zookeeper_path + "/blocks/" + child + "/number", - number, - acl, - zkutil::CreateMode::Persistent)); - - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child + "/number", -1)); - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child + "/checksum", -1)); - ops.push_back( - new zkutil::Op::Remove( - zookeeper_path + "/detached_sharded_blocks/" + child, -1)); - } - } - } - - LOG_DEBUG(log, log_msg); + LOG_DEBUG(log, "Adding attaches to log"); zookeeper->multi(ops); @@ -3481,12 +3429,12 @@ void StorageReplicatedMergeTree::freezePartition(const Field & partition, const } void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, const Field & first_partition, const Field & last_partition, - const WeightedZooKeeperPaths & weighted_zookeeper_paths, const String & sharding_key, + const WeightedZooKeeperPaths & weighted_zookeeper_paths, const ASTPtr & sharding_key_expr, const Settings & settings) { auto & resharding_worker = context.getReshardingWorker(); if (!resharding_worker.isStarted()) - throw Exception("Resharding worker is not running.", ErrorCodes::RESHARDING_NO_WORKER); + throw Exception("Resharding background thread is not running.", ErrorCodes::RESHARDING_NO_WORKER); for (const auto & weighted_path : weighted_zookeeper_paths) { @@ -3495,14 +3443,6 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, throw Exception("Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT); } - for (const auto & weighted_path : weighted_zookeeper_paths) - { - const std::string & path = weighted_path.first; - if ((path.length() <= getTableName().length()) || - (path.substr(path.length() - getTableName().length()) != getTableName())) - throw Exception("Shard does not reference table", ErrorCodes::SHARD_DOESNT_REFERENCE_TABLE); - } - DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t(); DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t(); @@ -3548,7 +3488,7 @@ void StorageReplicatedMergeTree::reshardPartitions(const String & database_name, /// Зарегистрировать фоновые задачи перешардирования. for (const auto & partition : partition_list) - resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key); + resharding_worker.submitJob(database_name, getTableName(), partition, weighted_zookeeper_paths, sharding_key_expr); } void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths) @@ -3641,8 +3581,8 @@ StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths InterserverIOEndpointLocation location(replica_path, address.host, address.replication_port); - tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeDiskSpace, - &free_disk_space_checker, location)); + tasks[i] = Tasks::value_type(std::bind(&RemoteDiskSpaceMonitor::Client::getFreeSpace, + &disk_space_monitor_client, location)); pool.schedule([i, &tasks]{ tasks[i](); }); } } diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index 36aa867969d..e16946c0520 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -173,8 +173,6 @@ public: */ void waitForDisappear(const std::string & path); - std::string getTaskQueuePath() const; - /** Асинхронный интерфейс (реализовано небольшое подмножество операций). * * Использование: @@ -300,7 +298,7 @@ private: friend struct WatchWithEvent; friend class EphemeralNodeHolder; - void init(const std::string & hosts, int32_t session_timeout_ms, const std::string & task_queue_path_ = ""); + void init(const std::string & hosts, int32_t session_timeout_ms); void removeChildrenRecursive(const std::string & path); void tryRemoveChildrenRecursive(const std::string & path); void * watchForEvent(EventPtr event); @@ -343,7 +341,6 @@ private: int32_t existsImpl(const std::string & path, Stat * stat_, EventPtr watch = nullptr); std::string hosts; - std::string task_queue_path; int32_t session_timeout_ms; std::mutex mutex; diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index 2d4bcb8da6b..1da49abbdf1 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -61,13 +61,12 @@ void ZooKeeper::processEvent(zhandle_t * zh, int type, int state, const char * p } } -void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_, const std::string & task_queue_path_) +void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_) { log = &Logger::get("ZooKeeper"); zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); hosts = hosts_; session_timeout_ms = session_timeout_ms_; - task_queue_path = task_queue_path_; impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0); ProfileEvents::increment(ProfileEvents::ZooKeeperInit); @@ -105,10 +104,6 @@ struct ZooKeeperArgs { session_timeout_ms = config.getInt(config_name + "." + key); } - else if (key == "task_queue_path") - { - task_queue_path = config.getString(config_name + "." + key); - } else throw KeeperException(std::string("Unknown key ") + key + " in config file"); } @@ -125,13 +120,12 @@ struct ZooKeeperArgs std::string hosts; size_t session_timeout_ms; - std::string task_queue_path; }; ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) { ZooKeeperArgs args(config, config_name); - init(args.hosts, args.session_timeout_ms, args.task_queue_path); + init(args.hosts, args.session_timeout_ms); } void * ZooKeeper::watchForEvent(EventPtr event) @@ -584,11 +578,6 @@ void ZooKeeper::waitForDisappear(const std::string & path) } } -std::string ZooKeeper::getTaskQueuePath() const -{ - return task_queue_path; -} - ZooKeeper::~ZooKeeper() { LOG_INFO(&Logger::get("~ZooKeeper"), "Closing ZooKeeper session");