Add basic support for attach

This commit is contained in:
Antonio Andelic 2022-08-10 08:52:36 +00:00
parent 4023d4a37a
commit e19ecd95d6
2 changed files with 50 additions and 39 deletions

View File

@ -310,81 +310,90 @@ StorageKeeperMap::StorageKeeperMap(
const StorageInMemoryMetadata & metadata,
bool attach,
std::string_view primary_key_,
std::string_view keeper_path_,
std::string_view root_path_,
const std::string & hosts,
bool create_missing_root_path,
size_t keys_limit_,
bool remove_existing_data)
: IKeyValueStorage(table_id), keeper_path(keeper_path_), primary_key(primary_key_), zookeeper_client(getZooKeeperClient(hosts, context))
: IKeyValueStorage(table_id), root_path(root_path_), primary_key(primary_key_), zookeeper_client(getZooKeeperClient(hosts, context)), log(&Poco::Logger::get("StorageKeeperMap"))
{
setInMemoryMetadata(metadata);
if (keeper_path.empty())
throw Exception("keeper_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
if (!keeper_path.starts_with('/'))
throw Exception("keeper_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
if (root_path.empty())
throw Exception("root_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
if (!root_path.starts_with('/'))
throw Exception("root_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
auto client = getClient();
std::filesystem::path root_path_fs{root_path};
auto metadata_path_fs = root_path_fs / "__ch_metadata";
metadata_path = metadata_path_fs;
lock_path = metadata_path_fs / "lock";
auto keys_limit_path = metadata_path_fs / "keys_limit";
if (attach)
{
// validate all metadata nodes are present
Coordination::Requests requests;
requests.push_back(zkutil::makeCheckRequest(root_path, -1));
requests.push_back(zkutil::makeCheckRequest(metadata_path, -1));
requests.push_back(zkutil::makeCheckRequest(lock_path, -1));
requests.push_back(zkutil::makeCheckRequest(keys_limit_path, -1));
client->multi(requests);
return;
}
if (keeper_path != "/" && !client->exists(keeper_path))
if (root_path != "/" && !client->exists(root_path))
{
if (!create_missing_root_path)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path '{}' doesn't exist. Please create it or set 'create_missing_root_path' to true'",
keeper_path_);
root_path_);
}
else
{
LOG_TRACE(&Poco::Logger::get("StorageKeeperMap"), "Creating root path {}", keeper_path);
LOG_TRACE(log, "Creating root path {}", root_path);
size_t cur_pos = 0;
do
{
size_t search_start = cur_pos + 1;
cur_pos = keeper_path.find('/', search_start);
cur_pos = root_path.find('/', search_start);
if (search_start == cur_pos)
throw Exception("keeper_path is invalid, contains subsequent '/'", ErrorCodes::BAD_ARGUMENTS);
throw Exception("root_path is invalid, contains subsequent '/'", ErrorCodes::BAD_ARGUMENTS);
auto path = keeper_path.substr(0, cur_pos);
auto path = root_path.substr(0, cur_pos);
client->createIfNotExists(path, "");
} while (cur_pos != std::string_view::npos);
}
}
Coordination::Stat stats;
auto exists = client->exists(keeper_path, &stats);
auto exists = client->exists(root_path, &stats);
if (!exists)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Path '{}' should exist, but was deleted from another source", keeper_path);
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Path '{}' should exist, but was deleted from another source", root_path);
if (stats.numChildren != 0)
{
if (!remove_existing_data)
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create StorageKeeperMap using '{}' as path because it contains children nodes", keeper_path);
throw Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create StorageKeeperMap using '{}' as path because it contains children nodes. Set remove_existing_data to 1 to clear children nodes", root_path);
LOG_INFO(&Poco::Logger::get("StorageKeepermap"), "Removing children for {} because remove_existing_data was set to true.", keeper_path);
client->removeChildrenRecursive(keeper_path);
LOG_INFO(log, "Removing children for {} because remove_existing_data was set to true.", root_path);
client->removeChildrenRecursive(root_path);
}
// create metadata nodes
std::filesystem::path root_path{keeper_path};
Coordination::Requests create_requests;
create_requests.push_back(zkutil::makeCreateRequest(metadata_path, "", zkutil::CreateMode::Persistent));
create_requests.push_back(zkutil::makeCreateRequest(lock_path, "", zkutil::CreateMode::Persistent));
create_requests.push_back(zkutil::makeCreateRequest(keys_limit_path, toString(keys_limit_), zkutil::CreateMode::Persistent));
auto metadata_path_fs = root_path / "__ch_metadata";
metadata_path = metadata_path_fs;
client->create(metadata_path, "", zkutil::CreateMode::Persistent);
lock_path = metadata_path_fs / "lock";
client->create(lock_path, "", zkutil::CreateMode::Persistent);
auto keys_limit_path = metadata_path_fs / "keys_limit";
client->create(keys_limit_path, toString(keys_limit_), zkutil::CreateMode::Persistent);
client->multi(create_requests);
}
@ -436,7 +445,7 @@ Pipe StorageKeeperMap::read(
auto & client = getClient();
if (all_scan)
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(keeper_path)));
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(root_path)));
return process_keys(std::move(filtered_keys));
}
@ -451,13 +460,13 @@ void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr & , Con
auto client = getClient();
ZooKeeperLock keeper_lock(lockPath(), client);
client->tryRemoveChildrenRecursive(keeper_path, /*probably_flat*/ true, getBaseName(metadata_path));
client->tryRemoveChildrenRecursive(root_path, /*probably_flat*/ true, getBaseName(metadata_path));
}
void StorageKeeperMap::drop()
{
auto client = getClient();
client->tryRemoveChildrenRecursive(keeper_path, /*probably_flat*/ false);
client->tryRemoveChildrenRecursive(root_path, /*probably_flat*/ false);
}
zkutil::ZooKeeperPtr & StorageKeeperMap::getClient() const
@ -473,12 +482,12 @@ zkutil::ZooKeeperPtr & StorageKeeperMap::getClient() const
const std::string & StorageKeeperMap::rootKeeperPath() const
{
return keeper_path;
return root_path;
}
std::string StorageKeeperMap::fullPathForKey(const std::string_view key) const
{
return fmt::format("{}/{}", keeper_path, key);
return fmt::format("{}/{}", root_path, key);
}
const std::string & StorageKeeperMap::lockPath() const
@ -579,14 +588,14 @@ StoragePtr create(const StorageFactory::Arguments & args)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage KeeperMap requires 1-5 arguments:\n"
"keeper_path: path in the Keeper where the values will be stored (required)\n"
"root_path: path in the Keeper where the values will be stored (required)\n"
"hosts: comma separated Keeper hosts, set to '{0}' to use the same Keeper as ClickHouse (default: '{0}')\n"
"keys_limit: number of keys allowed, set to 0 for no limit (default: 0)\n"
"create_missing_root_path: 1 if the root path should be created if it's missing, otherwise throw exception (default: 1)\n",
"remove_existing_data: true if children inside 'keeper_path' should be deleted, otherwise throw exception (default: 0)",
"remove_existing_data: true if children inside 'root_path' should be deleted, otherwise throw exception (default: 0)",
default_host);
auto keeper_path = checkAndGetLiteralArgument<std::string>(engine_args[0], "keeper_path");
auto root_path = checkAndGetLiteralArgument<std::string>(engine_args[0], "root_path");
std::string hosts = "default";
if (engine_args.size() > 1)
@ -602,7 +611,7 @@ StoragePtr create(const StorageFactory::Arguments & args)
bool remove_existing_data = false;
if (engine_args.size() > 4)
create_missing_root_path = checkAndGetLiteralArgument<UInt64>(engine_args[4], "create_missing_root_path");
remove_existing_data = checkAndGetLiteralArgument<UInt64>(engine_args[4], "remove_existing_data");
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
@ -617,7 +626,7 @@ StoragePtr create(const StorageFactory::Arguments & args)
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<StorageKeeperMap>(
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], keeper_path, hosts, create_missing_root_path, keys_limit, remove_existing_data);
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], root_path, hosts, create_missing_root_path, keys_limit, remove_existing_data);
}
}

View File

@ -23,7 +23,7 @@ public:
const StorageInMemoryMetadata & metadata,
bool attach,
std::string_view primary_key_,
std::string_view keeper_path_,
std::string_view root_path_,
const std::string & hosts,
bool create_missing_root_path,
size_t keys_limit,
@ -65,13 +65,15 @@ public:
UInt64 keysLimit() const;
private:
std::string keeper_path;
std::string root_path;
std::string primary_key;
std::string metadata_path;
std::string lock_path;
UInt64 keys_limit{0};
mutable zkutil::ZooKeeperPtr zookeeper_client;
Poco::Logger * log;
};
}