Named collections in keeper

This commit is contained in:
kssenii 2024-05-29 12:53:28 +02:00
parent 13fa946454
commit 92e004394d
3 changed files with 361 additions and 103 deletions

View File

@ -14,6 +14,9 @@
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
@ -29,6 +32,7 @@ namespace ErrorCodes
extern const int NAMED_COLLECTION_ALREADY_EXISTS;
extern const int NAMED_COLLECTION_DOESNT_EXIST;
extern const int BAD_ARGUMENTS;
extern const int INVALID_CONFIG_PARAMETER;
}
namespace NamedCollectionUtils
@ -104,44 +108,45 @@ private:
}
};
class INamedCollectionsStorage
{
public:
virtual ~INamedCollectionsStorage() = default;
class LoadFromSQL : private WithContext
virtual bool exists(const std::string & path) const = 0;
virtual std::vector<std::string> list() const = 0;
virtual std::string read(const std::string & path) const = 0;
virtual void write(const std::string & path, const std::string & data, bool replace) = 0;
virtual void remove(const std::string & path) = 0;
virtual bool removeIfExists(const std::string & path) = 0;
};
using NamedCollectionsStoragePtr = std::unique_ptr<INamedCollectionsStorage>;
class NamedCollectionsMetadata : private WithContext
{
private:
const std::string metadata_path;
NamedCollectionsStoragePtr storage;
public:
explicit LoadFromSQL(ContextPtr context_)
NamedCollectionsMetadata(NamedCollectionsStoragePtr storage_, ContextPtr context_)
: WithContext(context_)
, metadata_path(fs::weakly_canonical(context_->getPath()) / NAMED_COLLECTIONS_METADATA_DIRECTORY)
{
if (fs::exists(metadata_path))
cleanup();
}
, storage(std::move(storage_)) {}
std::vector<std::string> listCollections() const
{
if (!fs::exists(metadata_path))
return {};
std::vector<std::string> collection_names;
fs::directory_iterator it{metadata_path};
for (; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".sql")
{
collection_names.push_back(it->path().stem());
}
else
{
LOG_WARNING(
getLogger("NamedCollectionsLoadFromSQL"),
"Unexpected file {} in named collections directory",
current_path.filename().string());
}
}
return collection_names;
auto paths = storage->list();
std::vector<std::string> collections;
collections.reserve(paths.size());
for (const auto & path : paths)
collections.push_back(fs::path(path).stem());
return collections;
}
NamedCollectionsMap getAll() const
@ -163,26 +168,19 @@ public:
MutableNamedCollectionPtr get(const std::string & collection_name) const
{
const auto query = readCreateQueryFromMetadata(
getMetadataPath(collection_name),
getContext()->getSettingsRef());
const auto query = readCreateQuery(collection_name);
return createNamedCollectionFromAST(query);
}
MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query)
{
writeCreateQueryToMetadata(
query,
getMetadataPath(query.collection_name),
getContext()->getSettingsRef());
writeCreateQuery(query);
return createNamedCollectionFromAST(query);
}
void update(const ASTAlterNamedCollectionQuery & query)
{
const auto path = getMetadataPath(query.collection_name);
auto create_query = readCreateQueryFromMetadata(path, getContext()->getSettings());
auto create_query = readCreateQuery(query.collection_name);
std::unordered_map<std::string, Field> result_changes_map;
for (const auto & [name, value] : query.changes)
@ -236,31 +234,27 @@ public:
"Named collection cannot be empty (collection name: {})",
query.collection_name);
writeCreateQueryToMetadata(
create_query,
getMetadataPath(query.collection_name),
getContext()->getSettingsRef(),
true);
chassert(create_query.collection_name == query.collection_name);
writeCreateQuery(create_query, true);
}
void remove(const std::string & collection_name)
{
auto collection_path = getMetadataPath(collection_name);
if (!fs::exists(collection_path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
collection_name);
}
(void)fs::remove(collection_path);
storage->remove(getFileName(collection_name));
}
bool removeIfExists(const std::string & collection_name)
{
return storage->removeIfExists(getFileName(collection_name));
}
private:
static constexpr auto NAMED_COLLECTIONS_METADATA_DIRECTORY = "named_collections";
std::string getFileName(const std::string & collection_name) const
{
return escapeForFileName(collection_name) + ".sql";
}
static MutableNamedCollectionPtr createNamedCollectionFromAST(
const ASTCreateNamedCollectionQuery & query)
static MutableNamedCollectionPtr createNamedCollectionFromAST(const ASTCreateNamedCollectionQuery & query)
{
const auto & collection_name = query.collection_name;
const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
@ -273,46 +267,83 @@ private:
*config, collection_name, "", keys, SourceId::SQL, /* is_mutable */true);
}
std::string getMetadataPath(const std::string & collection_name) const
ASTCreateNamedCollectionQuery readCreateQuery(const std::string & collection_name) const
{
return fs::path(metadata_path) / (escapeForFileName(collection_name) + ".sql");
}
/// Delete .tmp files. They could be left undeleted in case of
/// some exception or abrupt server restart.
void cleanup()
{
fs::directory_iterator it{metadata_path};
std::vector<std::string> files_to_remove;
for (; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".tmp")
files_to_remove.push_back(current_path);
}
for (const auto & file : files_to_remove)
(void)fs::remove(file);
}
static ASTCreateNamedCollectionQuery readCreateQueryFromMetadata(
const std::string & path,
const Settings & settings)
{
ReadBufferFromFile in(path);
std::string query;
readStringUntilEOF(query, in);
const auto path = getFileName(collection_name);
auto query = storage->read(path);
ParserCreateNamedCollectionQuery parser;
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
auto ast = parseQuery(parser, query, "in file " + path, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
return create_query;
}
void writeCreateQueryToMetadata(
const ASTCreateNamedCollectionQuery & query,
const std::string & path,
const Settings & settings,
bool replace = false) const
void writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace = false)
{
auto normalized_query = query.clone();
auto & changes = typeid_cast<ASTCreateNamedCollectionQuery *>(normalized_query.get())->changes;
::sort(
changes.begin(), changes.end(),
[](const SettingChange & lhs, const SettingChange & rhs) { return lhs.name < rhs.name; });
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
}
};
class NamedCollectionsLocalStorage : public INamedCollectionsStorage, private WithContext
{
private:
std::string root_path;
public:
NamedCollectionsLocalStorage(ContextPtr context_, const std::string & path_)
: WithContext(context_)
, root_path(path_)
{
if (fs::exists(root_path))
cleanup();
}
~NamedCollectionsLocalStorage() override = default;
std::vector<std::string> list() const override
{
if (!fs::exists(root_path))
return {};
std::vector<std::string> elements;
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".sql")
{
elements.push_back(it->path());
}
else
{
LOG_WARNING(
getLogger("NamedCollectionsLocalStorage"),
"Unexpected file {} in named collections directory",
current_path.filename().string());
}
}
return elements;
}
bool exists(const std::string & path) const override
{
return fs::exists(getPath(path));
}
std::string read(const std::string & path) const override
{
ReadBufferFromFile in(getPath(path));
std::string data;
readStringUntilEOF(data, in);
return data;
}
void write(const std::string & path, const std::string & data, bool replace) override
{
if (!replace && fs::exists(path))
{
@ -322,22 +353,155 @@ private:
path);
}
fs::create_directories(metadata_path);
fs::create_directories(root_path);
auto tmp_path = path + ".tmp";
String formatted_query = serializeAST(query);
WriteBufferFromFile out(tmp_path, formatted_query.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(formatted_query, out);
auto tmp_path = getPath(path + ".tmp");
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(data, out);
out.next();
if (settings.fsync_metadata)
if (getContext()->getSettingsRef().fsync_metadata)
out.sync();
out.close();
fs::rename(tmp_path, path);
fs::rename(tmp_path, getPath(path));
}
void remove(const std::string & path) override
{
if (!removeIfExists(getPath(path)))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove `{}`, because it doesn't exist", path);
}
}
bool removeIfExists(const std::string & path) override
{
return fs::remove(getPath(path));
}
private:
std::string getPath(const std::string & path) const
{
return fs::path(root_path) / path;
}
/// Delete .tmp files. They could be left undeleted in case of
/// some exception or abrupt server restart.
void cleanup()
{
std::vector<std::string> files_to_remove;
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
{
const auto & current_path = it->path();
if (current_path.extension() == ".tmp")
files_to_remove.push_back(current_path);
}
for (const auto & file : files_to_remove)
fs::remove(file);
}
};
class NamedCollectionsZooKeeperStorage : public INamedCollectionsStorage, private WithContext
{
private:
std::string root_path;
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
public:
NamedCollectionsZooKeeperStorage(ContextPtr context_, const std::string & path_)
: WithContext(context_)
, root_path(path_)
{
if (root_path.empty())
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Collections path cannot be empty");
if (root_path != "/" && root_path.back() == '/')
root_path.resize(root_path.size() - 1);
if (root_path.front() != '/')
root_path = "/" + root_path;
auto client = getClient();
if (root_path != "/" && !client->exists(root_path))
{
client->createAncestors(root_path);
client->createIfNotExists(root_path, "");
}
}
~NamedCollectionsZooKeeperStorage() override = default;
std::vector<std::string> list() const override
{
return getClient()->getChildren(root_path);
}
bool exists(const std::string & path) const override
{
return getClient()->exists(getPath(path));
}
std::string read(const std::string & path) const override
{
return getClient()->get(getPath(path));
}
void write(const std::string & path, const std::string & data, bool replace) override
{
if (replace)
{
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
}
else
{
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
if (code == Coordination::Error::ZNODEEXISTS)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"Metadata file {} for named collection already exists",
path);
}
}
}
void remove(const std::string & path) override
{
getClient()->remove(getPath(path));
}
bool removeIfExists(const std::string & path) override
{
auto code = getClient()->tryRemove(getPath(path));
if (code == Coordination::Error::ZOK)
return true;
if (code == Coordination::Error::ZNONODE)
return false;
throw Coordination::Exception::fromPath(code, getPath(path));
}
private:
zkutil::ZooKeeperPtr getClient() const
{
if (!zookeeper_client || zookeeper_client->expired())
{
zookeeper_client = getContext()->getZooKeeper();
zookeeper_client->sync(root_path);
}
return zookeeper_client;
}
std::string getPath(const std::string & path) const
{
return fs::path(root_path) / path;
}
};
std::unique_lock<std::mutex> lockNamedCollectionsTransaction()
{
static std::mutex transaction_lock;
@ -371,9 +535,35 @@ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
is_loaded_from_config = true;
}
auto getNamedCollectionsStorage(ContextPtr context)
{
static const std::string storage_config_path = "named_collections_storage";
const auto & config = context->getConfigRef();
const auto storage = config.getString(storage_config_path + ".type", "local");
if (storage == "local")
{
const auto path = config.getString(storage_config_path + ".path", fs::path(context->getPath()) / "named_collections");
return NamedCollectionsMetadata(
std::make_unique<NamedCollectionsLocalStorage>(context, path), context);
}
if (storage == "zookeeper")
{
return NamedCollectionsMetadata(
std::make_unique<NamedCollectionsZooKeeperStorage>(
context, config.getString(storage_config_path + ".path")),
context);
}
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Unknown storage for named collections: {}", storage);
}
void loadFromSQLUnlocked(ContextPtr context, std::unique_lock<std::mutex> &)
{
auto named_collections = LoadFromSQL(context).getAll();
auto named_collections = getNamedCollectionsStorage(context).getAll();
LOG_TRACE(
getLogger("NamedCollectionsUtils"),
"Loaded {} collections from SQL", named_collections.size());
@ -421,7 +611,7 @@ void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context
}
return;
}
LoadFromSQL(context).remove(query.collection_name);
getNamedCollectionsStorage(context).remove(query.collection_name);
instance.remove(query.collection_name);
}
@ -441,7 +631,7 @@ void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr conte
}
return;
}
instance.add(query.collection_name, LoadFromSQL(context).create(query));
instance.add(query.collection_name, getNamedCollectionsStorage(context).create(query));
}
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
@ -460,7 +650,7 @@ void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr contex
}
return;
}
LoadFromSQL(context).update(query);
getNamedCollectionsStorage(context).update(query);
auto collection = instance.getMutable(query.collection_name);
auto collection_lock = collection->lock();

View File

@ -0,0 +1,11 @@
<clickhouse>
<named_collections_storage>
<type>zookeeper</type>
<path>/named_collections_path/</path>
</named_collections_storage>
<named_collections>
<collection1>
<key1>value1</key1>
</collection1>
</named_collections>
</clickhouse>

View File

@ -9,6 +9,7 @@ NAMED_COLLECTIONS_CONFIG = os.path.join(
SCRIPT_DIR, "./configs/config.d/named_collections.xml"
)
ZK_PATH = "/named_collections_path"
@pytest.fixture(scope="module")
def cluster():
@ -24,6 +25,17 @@ def cluster():
],
stay_alive=True,
)
cluster.add_instance(
"node_with_keeper",
main_configs=[
"configs/config.d/named_collections_with_zookeeper.xml",
],
user_configs=[
"configs/users.d/users.xml",
],
stay_alive=True,
with_zookeeper=True,
)
cluster.add_instance(
"node_only_named_collection_control",
main_configs=[
@ -53,7 +65,6 @@ def cluster():
finally:
cluster.shutdown()
def replace_in_server_config(node, old, new):
node.replace_in_config(
"/etc/clickhouse-server/config.d/named_collections.xml",
@ -447,8 +458,16 @@ def test_config_reload(cluster):
)
def test_sql_commands(cluster):
node = cluster.instances["node"]
@pytest.mark.parametrize("with_keeper", [False, True])
def test_sql_commands(cluster, with_keeper):
zk = None
node = None
if with_keeper:
node = cluster.instances["node_with_keeper"]
zk = cluster.get_kazoo_client("zoo1")
else:
node = cluster.instances["node"]
assert "1" == node.query("select count() from system.named_collections").strip()
node.query("CREATE NAMED COLLECTION collection2 AS key1=1, key2='value2'")
@ -479,6 +498,14 @@ def test_sql_commands(cluster):
"select collection['key2'] from system.named_collections where name = 'collection2'"
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 1, key2 = 'value2'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_created()
node.restart_clickhouse()
@ -508,6 +535,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 4, key2 = 'value2', key3 = 'value3'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_altered()
node.restart_clickhouse()
check_altered()
@ -522,6 +558,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key1 = 4, key3 = 'value3'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_deleted()
node.restart_clickhouse()
check_deleted()
@ -552,6 +597,15 @@ def test_sql_commands(cluster):
).strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 1 == len(children)
assert "collection2.sql" in children
assert (
b"CREATE NAMED COLLECTION collection2 AS key3 = 3, key4 = 'value4'"
in zk.get(ZK_PATH + "/collection2.sql")[0]
)
check_altered_and_deleted()
node.restart_clickhouse()
check_altered_and_deleted()
@ -564,6 +618,9 @@ def test_sql_commands(cluster):
"collection1"
== node.query("select name from system.named_collections").strip()
)
if zk is not None:
children = zk.get_children(ZK_PATH)
assert 0 == len(children)
check_dropped()
node.restart_clickhouse()