diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 96abf3b543a..74945e3b50c 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -737,7 +737,7 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & } } -bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node) +bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const std::string_view keep_child_node) { Strings children; if (tryGetChildren(path, children) != Coordination::Error::ZOK) @@ -754,13 +754,13 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab { String child_path = fs::path(path) / children.back(); - /// Will try to avoid recursive getChildren calls if child_path probably has no children. - /// It may be extremely slow when path contain a lot of leaf children. - if (!probably_flat) - tryRemoveChildrenRecursive(child_path); - if (likely(keep_child_node.empty() || keep_child_node != children.back())) { + /// Will try to avoid recursive getChildren calls if child_path probably has no children. + /// It may be extremely slow when path contain a lot of leaf children. + if (!probably_flat) + tryRemoveChildrenRecursive(child_path); + batch.push_back(child_path); ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1)); } diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index c9b5dc69499..416fc78d814 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -238,7 +238,7 @@ public: /// If probably_flat is true, this method will optimistically try to remove children non-recursive /// and will fall back to recursive removal if it gets ZNOTEMPTY for some child. /// Returns true if no kind of fallback happened. - bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {}); + bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, std::string_view keep_child_node = {}); /// Remove all children nodes (non recursive). void removeChildren(const std::string & path); diff --git a/src/Storages/KVStorageUtils.h b/src/Storages/KVStorageUtils.h index 3807e5f084b..e3216164869 100644 --- a/src/Storages/KVStorageUtils.h +++ b/src/Storages/KVStorageUtils.h @@ -30,7 +30,6 @@ void fillColumns(const K & key, const V & value, size_t key_pos, const Block & h for (size_t i = 0; i < header.columns(); ++i) { const auto & serialization = header.getByPosition(i).type->getDefaultSerialization(); - LOG_INFO(&Poco::Logger::get("LOGGER"), "Reading coluimn {} of type {}", i, columns[i]->getDataType()); serialization->deserializeBinary(*columns[i], i == key_pos ? key_buffer : value_buffer); } } diff --git a/src/Storages/StorageKeeperMap.cpp b/src/Storages/StorageKeeperMap.cpp index 081861653ae..3096ea0ba72 100644 --- a/src/Storages/StorageKeeperMap.cpp +++ b/src/Storages/StorageKeeperMap.cpp @@ -82,10 +82,11 @@ std::string_view getBaseName(const std::string_view path) struct ZooKeeperLock { - explicit ZooKeeperLock(std::string lock_path_, zkutil::ZooKeeperPtr client_) + explicit ZooKeeperLock(std::string lock_path_, zkutil::ZooKeeperPtr client_, bool defer_lock = false) : lock_path(std::move(lock_path_)), client(std::move(client_)) { - lock(); + if (!defer_lock) + lock(); } ~ZooKeeperLock() @@ -120,7 +121,7 @@ struct ZooKeeperLock void unlock() { assert(locked); - client->remove(sequence_path); + client->tryRemove(sequence_path); } private: @@ -307,11 +308,13 @@ StorageKeeperMap::StorageKeeperMap( ContextPtr context, const StorageID & table_id, const StorageInMemoryMetadata & metadata, + bool attach, std::string_view primary_key_, std::string_view keeper_path_, const std::string & hosts, bool create_missing_root_path, - size_t keys_limit_) + size_t keys_limit_, + bool remove_existing_data) : IKeyValueStorage(table_id), keeper_path(keeper_path_), primary_key(primary_key_), zookeeper_client(getZooKeeperClient(hosts, context)) { setInMemoryMetadata(metadata); @@ -323,6 +326,12 @@ StorageKeeperMap::StorageKeeperMap( auto client = getClient(); + if (attach) + { + // validate all metadata nodes are present + return; + } + if (keeper_path != "/" && !client->exists(keeper_path)) { if (!create_missing_root_path) @@ -350,40 +359,32 @@ StorageKeeperMap::StorageKeeperMap( } } + Coordination::Stat stats; + auto exists = client->exists(keeper_path, &stats); + if (!exists) + throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Path '{}' should exist, but was deleted from another source", keeper_path); + + if (stats.numChildren != 0) + { + if (!remove_existing_data) + throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create StorageKeeperMap using '{}' as path because it contains children nodes", keeper_path); + + LOG_INFO(&Poco::Logger::get("StorageKeepermap"), "Removing children for {} because remove_existing_data was set to true.", keeper_path); + client->removeChildrenRecursive(keeper_path); + } + // create metadata nodes std::filesystem::path root_path{keeper_path}; auto metadata_path_fs = root_path / "__ch_metadata"; metadata_path = metadata_path_fs; - client->createIfNotExists(metadata_path, ""); + client->create(metadata_path, "", zkutil::CreateMode::Persistent); lock_path = metadata_path_fs / "lock"; - client->createIfNotExists(lock_path, ""); + client->create(lock_path, "", zkutil::CreateMode::Persistent); auto keys_limit_path = metadata_path_fs / "keys_limit"; - auto status = client->tryCreate(keys_limit_path, toString(keys_limit_), zkutil::CreateMode::Persistent); - if (status == Coordination::Error::ZNODEEXISTS) - { - auto data = client->get(keys_limit_path, nullptr, nullptr); - UInt64 stored_keys_limit = parse(data); - if (stored_keys_limit != keys_limit_) - { - keys_limit = stored_keys_limit; - LOG_WARNING( - &Poco::Logger::get("StorageKeeperMap"), - "Keys limit is already set for {} to {}. Going to use already set value", - keeper_path, - stored_keys_limit); - } - } - else if (status == Coordination::Error::ZOK) - { - keys_limit = keys_limit_; - } - else - { - throw zkutil::KeeperException(status, keys_limit_path); - } + client->create(keys_limit_path, toString(keys_limit_), zkutil::CreateMode::Persistent); } @@ -445,6 +446,20 @@ SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const Storage return std::make_shared(*this, metadata_snapshot); } +void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &) +{ + auto client = getClient(); + + ZooKeeperLock keeper_lock(lockPath(), client); + client->tryRemoveChildrenRecursive(keeper_path, /*probably_flat*/ true, getBaseName(metadata_path)); +} + +void StorageKeeperMap::drop() +{ + auto client = getClient(); + client->tryRemoveChildrenRecursive(keeper_path, /*probably_flat*/ false); +} + zkutil::ZooKeeperPtr & StorageKeeperMap::getClient() const { if (zookeeper_client->expired()) @@ -563,11 +578,12 @@ StoragePtr create(const StorageFactory::Arguments & args) if (engine_args.empty() || engine_args.size() > 4) throw Exception( ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, - "Storage KeeperMap requires 1-4 arguments:\n" + "Storage KeeperMap requires 1-5 arguments:\n" "keeper_path: path in the Keeper where the values will be stored (required)\n" - "keys_limit: number of keys allowed, set to 0 for no limit (default: 0)\n" "hosts: comma separated Keeper hosts, set to '{0}' to use the same Keeper as ClickHouse (default: '{0}')\n" - "create_missing_root_path: true if the root path should be created if it's missing (default: 1)", + "keys_limit: number of keys allowed, set to 0 for no limit (default: 0)\n" + "create_missing_root_path: 1 if the root path should be created if it's missing, otherwise throw exception (default: 1)\n", + "remove_existing_data: true if children inside 'keeper_path' should be deleted, otherwise throw exception (default: 0)", default_host); auto keeper_path = checkAndGetLiteralArgument(engine_args[0], "keeper_path"); @@ -584,6 +600,10 @@ StoragePtr create(const StorageFactory::Arguments & args) if (engine_args.size() > 3) create_missing_root_path = checkAndGetLiteralArgument(engine_args[3], "create_missing_root_path"); + bool remove_existing_data = false; + if (engine_args.size() > 4) + create_missing_root_path = checkAndGetLiteralArgument(engine_args[4], "create_missing_root_path"); + StorageInMemoryMetadata metadata; metadata.setColumns(args.columns); metadata.setConstraints(args.constraints); @@ -597,7 +617,7 @@ StoragePtr create(const StorageFactory::Arguments & args) throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS); return std::make_shared( - args.getContext(), args.table_id, metadata, primary_key_names[0], keeper_path, hosts, create_missing_root_path, keys_limit); + args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], keeper_path, hosts, create_missing_root_path, keys_limit, remove_existing_data); } } diff --git a/src/Storages/StorageKeeperMap.h b/src/Storages/StorageKeeperMap.h index fdbda1cde34..e1ddc304c68 100644 --- a/src/Storages/StorageKeeperMap.h +++ b/src/Storages/StorageKeeperMap.h @@ -17,16 +17,17 @@ namespace DB class StorageKeeperMap final : public IKeyValueStorage { public: - // TODO(antonio2368): add setting to control creating if keeper_path doesn't exist StorageKeeperMap( ContextPtr context, const StorageID & table_id, const StorageInMemoryMetadata & metadata, + bool attach, std::string_view primary_key_, std::string_view keeper_path_, const std::string & hosts, bool create_missing_root_path, - size_t keys_limit); + size_t keys_limit, + bool remove_existing_data); Pipe read( const Names & column_names, @@ -39,6 +40,9 @@ public: SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override; + void truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &) override; + void drop() override; + std::string getName() const override { return "KeeperMap"; } Names getPrimaryKey() const override { return {primary_key}; } diff --git a/tests/queries/0_stateless/02381_keeper_map.reference b/tests/queries/0_stateless/02381_keeper_map.reference new file mode 100644 index 00000000000..eea8dd975e8 --- /dev/null +++ b/tests/queries/0_stateless/02381_keeper_map.reference @@ -0,0 +1,6 @@ +1 +1 +1 +1 +1 1 1 1 1 +1 diff --git a/tests/queries/0_stateless/02381_keeper_map.sql b/tests/queries/0_stateless/02381_keeper_map.sql new file mode 100644 index 00000000000..24048a67cfd --- /dev/null +++ b/tests/queries/0_stateless/02381_keeper_map.sql @@ -0,0 +1,42 @@ +-- Tags: no-ordinary-database, no-fasttest +-- Tag no-ordinary-database: Sometimes cannot lock file most likely due to concurrent or adjacent tests, but we don't care how it works in Ordinary database +-- Tag no-fasttest: In fasttest, ENABLE_LIBRARIES=0, so rocksdb engine is not enabled by default + +SET database_atomic_wait_for_drop_and_detach_synchronously = 1; + +DROP TABLE IF EXISTS 02381_test; + +CREATE TABLE 02381_test (key String, value UInt32) Engine=KeeperMap('/test2381'); -- { serverError 36 } +CREATE TABLE 02381_test (key String, value UInt32) Engine=KeeperMap('/test2381') PRIMARY KEY(key2); -- { serverError 47 } +CREATE TABLE 02381_test (key String, value UInt32) Engine=KeeperMap('/test2381') PRIMARY KEY(key, value); -- { serverError 36 } +CREATE TABLE 02381_test (key Tuple(String, UInt32), value UInt64) Engine=KeeperMap('/test2381') PRIMARY KEY(key); + +DROP TABLE IF EXISTS 02381_test; +CREATE TABLE 02381_test (key String, value UInt32) Engine=KeeperMap('/test2381') PRIMARY KEY(key); + +INSERT INTO 02381_test SELECT '1_1', number FROM numbers(10000); +SELECT COUNT(1) == 1 FROM 02381_test; + +INSERT INTO 02381_test SELECT concat(toString(number), '_1'), number FROM numbers(10000); +SELECT COUNT(1) == 10000 FROM 02381_test; +SELECT uniqExact(key) == 32 FROM (SELECT * FROM 02381_test LIMIT 32 SETTINGS max_block_size = 1); +SELECT SUM(value) == 1 + 99 + 900 FROM 02381_test WHERE key IN ('1_1', '99_1', '900_1'); + +DROP TABLE IF EXISTS 02381_test; +DROP TABLE IF EXISTS 02381_test_memory; + +CREATE TABLE 02381_test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=KeeperMap('/test2381') PRIMARY KEY(k); +CREATE TABLE 02381_test_memory AS 02381_test Engine = Memory; + +INSERT INTO 02381_test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k; + +INSERT INTO 02381_test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k; + + +SELECT A.a = B.a, A.b = B.b, A.c = B.c, A.d = B.d, A.e = B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02381_test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02381_test_memory) B USING a ORDER BY a; + +TRUNCATE TABLE 02381_test; +SELECT 0 == COUNT(1) FROM 02381_test; + +DROP TABLE IF EXISTS 02381_test; +DROP TABLE IF EXISTS 02381_test_memory;