mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #39976 from ClickHouse/keeper-storage
KeeperMap storage engine
This commit is contained in:
commit
baf7255cff
33
src/Common/Base64.cpp
Normal file
33
src/Common/Base64.cpp
Normal file
@ -0,0 +1,33 @@
|
||||
#include <Common/Base64.h>
|
||||
|
||||
#include <Poco/Base64Decoder.h>
|
||||
#include <Poco/Base64Encoder.h>
|
||||
#include <Poco/MemoryStream.h>
|
||||
#include <Poco/StreamCopier.h>
|
||||
|
||||
#include <sstream>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string base64Encode(const std::string & decoded, bool url_encoding)
|
||||
{
|
||||
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
ostr.exceptions(std::ios::failbit);
|
||||
Poco::Base64Encoder encoder(ostr, url_encoding ? Poco::BASE64_URL_ENCODING : 0);
|
||||
encoder.rdbuf()->setLineLength(0);
|
||||
encoder << decoded;
|
||||
encoder.close();
|
||||
return ostr.str();
|
||||
}
|
||||
|
||||
std::string base64Decode(const std::string & encoded, bool url_encoding)
|
||||
{
|
||||
std::string decoded;
|
||||
Poco::MemoryInputStream istr(encoded.data(), encoded.size());
|
||||
Poco::Base64Decoder decoder(istr, url_encoding ? Poco::BASE64_URL_ENCODING : 0);
|
||||
Poco::StreamCopier::copyToString(decoder, decoded);
|
||||
return decoded;
|
||||
}
|
||||
|
||||
}
|
12
src/Common/Base64.h
Normal file
12
src/Common/Base64.h
Normal file
@ -0,0 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string base64Encode(const std::string & decoded, bool url_encoding = false);
|
||||
|
||||
std::string base64Decode(const std::string & encoded, bool url_encoding = false);
|
||||
|
||||
}
|
@ -636,6 +636,7 @@
|
||||
M(665, CANNOT_CONNECT_NATS) \
|
||||
M(666, CANNOT_USE_CACHE) \
|
||||
M(667, NOT_INITIALIZED) \
|
||||
M(668, INVALID_STATE) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -605,24 +605,31 @@ void ZooKeeper::removeChildren(const std::string & path)
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & keep_child_node)
|
||||
void ZooKeeper::removeChildrenRecursive(const std::string & path, RemoveException keep_child)
|
||||
{
|
||||
Strings children = getChildren(path);
|
||||
while (!children.empty())
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
|
||||
{
|
||||
if (keep_child.path.empty() || keep_child.path != children.back()) [[likely]]
|
||||
{
|
||||
removeChildrenRecursive(fs::path(path) / children.back());
|
||||
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
|
||||
ops.emplace_back(makeRemoveRequest(fs::path(path) / children.back(), -1));
|
||||
}
|
||||
else if (keep_child.remove_subtree)
|
||||
{
|
||||
removeChildrenRecursive(fs::path(path) / children.back());
|
||||
}
|
||||
|
||||
children.pop_back();
|
||||
}
|
||||
multi(ops);
|
||||
}
|
||||
}
|
||||
|
||||
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node)
|
||||
bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, RemoveException keep_child)
|
||||
{
|
||||
Strings children;
|
||||
if (tryGetChildren(path, children) != Coordination::Error::ZOK)
|
||||
@ -639,16 +646,20 @@ bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probab
|
||||
{
|
||||
String child_path = fs::path(path) / children.back();
|
||||
|
||||
if (keep_child.path.empty() || keep_child.path != children.back()) [[likely]]
|
||||
{
|
||||
/// Will try to avoid recursive getChildren calls if child_path probably has no children.
|
||||
/// It may be extremely slow when path contain a lot of leaf children.
|
||||
if (!probably_flat)
|
||||
tryRemoveChildrenRecursive(child_path);
|
||||
|
||||
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
|
||||
{
|
||||
batch.push_back(child_path);
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1));
|
||||
}
|
||||
else if (keep_child.remove_subtree && !probably_flat)
|
||||
{
|
||||
tryRemoveChildrenRecursive(child_path);
|
||||
}
|
||||
|
||||
children.pop_back();
|
||||
}
|
||||
|
@ -58,6 +58,18 @@ struct ShuffleHost
|
||||
}
|
||||
};
|
||||
|
||||
struct RemoveException
|
||||
{
|
||||
explicit RemoveException(std::string_view path_ = "", bool remove_subtree_ = true)
|
||||
: path(path_)
|
||||
, remove_subtree(remove_subtree_)
|
||||
{}
|
||||
|
||||
std::string_view path;
|
||||
// whether we should keep the child node and its subtree or just the child node
|
||||
bool remove_subtree;
|
||||
};
|
||||
|
||||
using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
|
||||
|
||||
/// ZooKeeper session. The interface is substantially different from the usual libzookeeper API.
|
||||
@ -219,13 +231,13 @@ public:
|
||||
void tryRemoveRecursive(const std::string & path);
|
||||
|
||||
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
|
||||
/// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree).
|
||||
/// It can be useful to keep some child node as a flag which indicates that path is currently removing.
|
||||
void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {});
|
||||
/// Node defined as RemoveException will not be deleted.
|
||||
void removeChildrenRecursive(const std::string & path, RemoveException keep_child = RemoveException{});
|
||||
/// If probably_flat is true, this method will optimistically try to remove children non-recursive
|
||||
/// and will fall back to recursive removal if it gets ZNOTEMPTY for some child.
|
||||
/// Returns true if no kind of fallback happened.
|
||||
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {});
|
||||
/// Node defined as RemoveException will not be deleted.
|
||||
bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, RemoveException keep_child= RemoveException{});
|
||||
|
||||
/// Remove all children nodes (non recursive).
|
||||
void removeChildren(const std::string & path);
|
||||
|
@ -1,11 +1,11 @@
|
||||
#include <iterator>
|
||||
#include <variant>
|
||||
#include <Coordination/KeeperStorage.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <Poco/Base64Encoder.h>
|
||||
#include <Poco/SHA1Engine.h>
|
||||
|
||||
#include <Common/Base64.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
@ -15,8 +15,11 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/LockMemoryExceptionInThread.h>
|
||||
|
||||
#include <Coordination/pathUtils.h>
|
||||
#include <Coordination/KeeperConstants.h>
|
||||
#include <Coordination/KeeperStorage.h>
|
||||
|
||||
#include <sstream>
|
||||
#include <iomanip>
|
||||
#include <mutex>
|
||||
@ -36,17 +39,6 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
String base64Encode(const String & decoded)
|
||||
{
|
||||
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
ostr.exceptions(std::ios::failbit);
|
||||
Poco::Base64Encoder encoder(ostr);
|
||||
encoder.rdbuf()->setLineLength(0);
|
||||
encoder << decoded;
|
||||
encoder.close();
|
||||
return ostr.str();
|
||||
}
|
||||
|
||||
String getSHA1(const String & userdata)
|
||||
{
|
||||
Poco::SHA1Engine engine;
|
||||
|
@ -890,7 +890,7 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper)
|
||||
|
||||
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
|
||||
/// creating node_path/active node (see createStatusDirs(...))
|
||||
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, "finished");
|
||||
zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, zkutil::RemoveException{"finished"});
|
||||
|
||||
/// And then we remove node_path and node_path/finished in a single transaction
|
||||
Coordination::Requests ops;
|
||||
|
771
src/Storages/StorageKeeperMap.cpp
Normal file
771
src/Storages/StorageKeeperMap.cpp
Normal file
@ -0,0 +1,771 @@
|
||||
#include <Storages/StorageKeeperMap.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
|
||||
#include <Databases/DatabaseReplicated.h>
|
||||
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/UUID.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
|
||||
#include <Processors/ISource.h>
|
||||
#include <Processors/Sinks/SinkToStorage.h>
|
||||
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
|
||||
#include <Common/Base64.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
|
||||
#include <base/types.h>
|
||||
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int KEEPER_EXCEPTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
std::string formattedAST(const ASTPtr & ast)
|
||||
{
|
||||
if (!ast)
|
||||
return "";
|
||||
return serializeAST(*ast);
|
||||
}
|
||||
|
||||
void verifyTableId(const StorageID & table_id)
|
||||
{
|
||||
if (!table_id.hasUUID())
|
||||
{
|
||||
auto database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"KeeperMap cannot be used with '{}' database because it uses {} engine. Please use Atomic or Replicated database",
|
||||
table_id.getDatabaseName(),
|
||||
database->getEngineName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class StorageKeeperMapSink : public SinkToStorage
|
||||
{
|
||||
StorageKeeperMap & storage;
|
||||
std::unordered_map<std::string, std::string> new_values;
|
||||
size_t primary_key_pos;
|
||||
|
||||
public:
|
||||
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
|
||||
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
|
||||
{
|
||||
auto primary_key = storage.getPrimaryKey();
|
||||
assert(primary_key.size() == 1);
|
||||
primary_key_pos = getHeader().getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
std::string getName() const override { return "StorageKeeperMapSink"; }
|
||||
|
||||
void consume(Chunk chunk) override
|
||||
{
|
||||
auto rows = chunk.getNumRows();
|
||||
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
|
||||
WriteBufferFromOwnString wb_key;
|
||||
WriteBufferFromOwnString wb_value;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
wb_key.restart();
|
||||
wb_value.restart();
|
||||
|
||||
size_t idx = 0;
|
||||
for (const auto & elem : block)
|
||||
{
|
||||
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value);
|
||||
++idx;
|
||||
}
|
||||
|
||||
auto key = base64Encode(wb_key.str(), /* url_encoding */ true);
|
||||
new_values[std::move(key)] = std::move(wb_value.str());
|
||||
}
|
||||
}
|
||||
|
||||
void onFinish() override
|
||||
{
|
||||
auto zookeeper = storage.getClient();
|
||||
|
||||
Coordination::Requests requests;
|
||||
|
||||
auto keys_limit = storage.keysLimit();
|
||||
|
||||
size_t current_keys_num = 0;
|
||||
size_t new_keys_num = 0;
|
||||
|
||||
// We use keys limit as a soft limit so we ignore some cases when it can be still exceeded
|
||||
// (e.g if parallel insert queries are being run)
|
||||
if (keys_limit != 0)
|
||||
{
|
||||
Coordination::Stat data_stat;
|
||||
zookeeper->get(storage.dataPath(), &data_stat);
|
||||
current_keys_num = data_stat.numChildren;
|
||||
}
|
||||
|
||||
std::vector<std::pair<const std::string *, std::future<Coordination::ExistsResponse>>> exist_responses;
|
||||
for (const auto & [key, value] : new_values)
|
||||
{
|
||||
auto path = storage.fullPathForKey(key);
|
||||
|
||||
exist_responses.push_back({&key, zookeeper->asyncExists(path)});
|
||||
}
|
||||
|
||||
for (auto & [key, response] : exist_responses)
|
||||
{
|
||||
if (response.get().error == Coordination::Error::ZOK)
|
||||
{
|
||||
requests.push_back(zkutil::makeSetRequest(storage.fullPathForKey(*key), new_values[*key], -1));
|
||||
}
|
||||
else
|
||||
{
|
||||
requests.push_back(zkutil::makeCreateRequest(storage.fullPathForKey(*key), new_values[*key], zkutil::CreateMode::Persistent));
|
||||
++new_keys_num;
|
||||
}
|
||||
}
|
||||
|
||||
if (new_keys_num != 0)
|
||||
{
|
||||
auto will_be = current_keys_num + new_keys_num;
|
||||
if (keys_limit != 0 && will_be > keys_limit)
|
||||
throw Exception(
|
||||
ErrorCodes::LIMIT_EXCEEDED,
|
||||
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
|
||||
new_keys_num,
|
||||
keys_limit,
|
||||
will_be);
|
||||
}
|
||||
|
||||
zookeeper->multi(requests);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename KeyContainer>
|
||||
class StorageKeeperMapSource : public ISource
|
||||
{
|
||||
const StorageKeeperMap & storage;
|
||||
size_t max_block_size;
|
||||
|
||||
using KeyContainerPtr = std::shared_ptr<KeyContainer>;
|
||||
KeyContainerPtr container;
|
||||
using KeyContainerIter = typename KeyContainer::const_iterator;
|
||||
KeyContainerIter it;
|
||||
KeyContainerIter end;
|
||||
|
||||
public:
|
||||
StorageKeeperMapSource(
|
||||
const StorageKeeperMap & storage_,
|
||||
const Block & header,
|
||||
size_t max_block_size_,
|
||||
KeyContainerPtr container_,
|
||||
KeyContainerIter begin_,
|
||||
KeyContainerIter end_)
|
||||
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
|
||||
{
|
||||
}
|
||||
|
||||
std::string getName() const override { return "StorageKeeperMapSource"; }
|
||||
|
||||
Chunk generate() override
|
||||
{
|
||||
if (it >= end)
|
||||
{
|
||||
it = {};
|
||||
return {};
|
||||
}
|
||||
|
||||
using KeyType = typename KeyContainer::value_type;
|
||||
if constexpr (std::same_as<KeyType, Field>)
|
||||
{
|
||||
const auto & sample_block = getPort().getHeader();
|
||||
const auto & key_column_type = sample_block.getByName(storage.getPrimaryKey().at(0)).type;
|
||||
auto raw_keys = serializeKeysToRawString(it, end, key_column_type, max_block_size);
|
||||
|
||||
for (auto & raw_key : raw_keys)
|
||||
raw_key = base64Encode(raw_key, /* url_encoding */ true);
|
||||
|
||||
return storage.getBySerializedKeys(raw_keys, nullptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
|
||||
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
|
||||
it += elem_num;
|
||||
return chunk;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
StorageKeeperMap::StorageKeeperMap(
|
||||
ContextPtr context_,
|
||||
const StorageID & table_id,
|
||||
const StorageInMemoryMetadata & metadata,
|
||||
bool attach,
|
||||
std::string_view primary_key_,
|
||||
const std::string & root_path_,
|
||||
UInt64 keys_limit_)
|
||||
: IStorage(table_id)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, root_path(zkutil::extractZooKeeperPath(root_path_, false))
|
||||
, primary_key(primary_key_)
|
||||
, zookeeper_name(zkutil::extractZooKeeperName(root_path_))
|
||||
, keys_limit(keys_limit_)
|
||||
, log(&Poco::Logger::get(fmt::format("StorageKeeperMap ({})", table_id.getNameForLogs())))
|
||||
{
|
||||
std::string path_prefix = context_->getConfigRef().getString("keeper_map_path_prefix", "");
|
||||
if (path_prefix.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KeeperMap is disabled because 'keeper_map_path_prefix' config is not defined");
|
||||
|
||||
verifyTableId(table_id);
|
||||
|
||||
setInMemoryMetadata(metadata);
|
||||
|
||||
WriteBufferFromOwnString out;
|
||||
out << "KeeperMap metadata format version: 1\n"
|
||||
<< "columns: " << metadata.columns.toString()
|
||||
<< "primary key: " << formattedAST(metadata.getPrimaryKey().expression_list_ast) << "\n";
|
||||
metadata_string = out.str();
|
||||
|
||||
if (root_path.empty())
|
||||
throw Exception("root_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
if (!root_path.starts_with('/'))
|
||||
throw Exception("root_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto config_keys_limit = context_->getConfigRef().getUInt64("keeper_map_keys_limit", 0);
|
||||
if (config_keys_limit != 0 && (keys_limit == 0 || keys_limit > config_keys_limit))
|
||||
{
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Keys limit defined by argument ({}) is larger than the one defined by 'keeper_map_keys_limit' config ({}). Will use "
|
||||
"config defined value",
|
||||
keys_limit,
|
||||
config_keys_limit);
|
||||
keys_limit = config_keys_limit;
|
||||
}
|
||||
else if (keys_limit > 0)
|
||||
{
|
||||
LOG_INFO(log, "Keys limit will be set to {}", keys_limit);
|
||||
}
|
||||
|
||||
auto root_path_fs = fs::path(path_prefix) / std::string_view{root_path}.substr(1);
|
||||
root_path = root_path_fs.generic_string();
|
||||
|
||||
data_path = root_path_fs / "data";
|
||||
|
||||
auto metadata_path_fs = root_path_fs / "metadata";
|
||||
metadata_path = metadata_path_fs;
|
||||
tables_path = metadata_path_fs / "tables";
|
||||
|
||||
auto table_unique_id = toString(table_id.uuid) + toString(ServerUUID::get());
|
||||
table_path = fs::path(tables_path) / table_unique_id;
|
||||
|
||||
dropped_path = metadata_path_fs / "dropped";
|
||||
dropped_lock_path = fs::path(dropped_path) / "lock";
|
||||
|
||||
if (attach)
|
||||
{
|
||||
checkTable<false>();
|
||||
return;
|
||||
}
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
if (root_path != "/" && !client->exists(root_path))
|
||||
{
|
||||
LOG_TRACE(log, "Creating root path {}", root_path);
|
||||
client->createAncestors(root_path);
|
||||
client->createIfNotExists(root_path, "");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 1000; ++i)
|
||||
{
|
||||
if (client->exists(dropped_path))
|
||||
{
|
||||
LOG_INFO(log, "Removing leftover nodes");
|
||||
auto code = client->tryCreate(dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
LOG_INFO(log, "Someone else removed leftover nodes");
|
||||
}
|
||||
else if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "Someone else is removing leftover nodes");
|
||||
continue;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
{
|
||||
throw Coordination::Exception(code, dropped_lock_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
|
||||
if (!dropTable(client, metadata_drop_lock))
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
std::string stored_metadata_string;
|
||||
auto exists = client->tryGet(metadata_path, stored_metadata_string);
|
||||
|
||||
if (exists)
|
||||
{
|
||||
// this requires same name for columns
|
||||
// maybe we can do a smarter comparison for columns and primary key expression
|
||||
if (stored_metadata_string != metadata_string)
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Path {} is already used but the stored table definition doesn't match. Stored metadata: {}",
|
||||
root_path,
|
||||
stored_metadata_string);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto code = client->tryCreate(metadata_path, metadata_string, zkutil::CreateMode::Persistent);
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
continue;
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
throw Coordination::Exception(code, metadata_path);
|
||||
}
|
||||
|
||||
client->createIfNotExists(tables_path, "");
|
||||
|
||||
auto code = client->tryCreate(table_path, "", zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
// metadata now should be guaranteed to exist because we added our UUID to the tables_path
|
||||
client->createIfNotExists(data_path, "");
|
||||
table_is_valid = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
LOG_INFO(log, "Metadata nodes were deleted in background, will retry");
|
||||
else
|
||||
throw Coordination::Exception(code, table_path);
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot create metadata for table, because it is removed concurrently or because of wrong root_path ({})", root_path);
|
||||
}
|
||||
|
||||
|
||||
Pipe StorageKeeperMap::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context_,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
checkTable<true>();
|
||||
storage_snapshot->check(column_names);
|
||||
|
||||
FieldVectorPtr filtered_keys;
|
||||
bool all_scan;
|
||||
|
||||
Block sample_block = storage_snapshot->metadata->getSampleBlock();
|
||||
auto primary_key_type = sample_block.getByName(primary_key).type;
|
||||
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);
|
||||
|
||||
const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
|
||||
{
|
||||
if (keys->empty())
|
||||
return {};
|
||||
|
||||
::sort(keys->begin(), keys->end());
|
||||
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
|
||||
|
||||
Pipes pipes;
|
||||
|
||||
size_t num_keys = keys->size();
|
||||
size_t num_threads = std::min<size_t>(num_streams, keys->size());
|
||||
|
||||
assert(num_keys <= std::numeric_limits<uint32_t>::max());
|
||||
assert(num_threads <= std::numeric_limits<uint32_t>::max());
|
||||
|
||||
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
|
||||
{
|
||||
size_t begin = num_keys * thread_idx / num_threads;
|
||||
size_t end = num_keys * (thread_idx + 1) / num_threads;
|
||||
|
||||
using KeyContainer = typename KeyContainerPtr::element_type;
|
||||
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
|
||||
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
|
||||
}
|
||||
return Pipe::unitePipes(std::move(pipes));
|
||||
};
|
||||
|
||||
auto client = getClient();
|
||||
if (all_scan)
|
||||
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(data_path)));
|
||||
|
||||
return process_keys(std::move(filtered_keys));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
|
||||
{
|
||||
checkTable<true>();
|
||||
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
|
||||
}
|
||||
|
||||
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
|
||||
{
|
||||
checkTable<true>();
|
||||
auto client = getClient();
|
||||
client->tryRemoveChildrenRecursive(data_path, true);
|
||||
}
|
||||
|
||||
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
|
||||
{
|
||||
zookeeper->removeChildrenRecursive(data_path);
|
||||
|
||||
bool completely_removed = false;
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(dropped_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(data_path, -1));
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(metadata_path, -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
auto code = zookeeper->tryMulti(ops, responses);
|
||||
using enum Coordination::Error;
|
||||
switch (code)
|
||||
{
|
||||
case ZOK:
|
||||
{
|
||||
metadata_drop_lock->setAlreadyRemoved();
|
||||
completely_removed = true;
|
||||
LOG_INFO(log, "Metadata ({}) and data ({}) was successfully removed from ZooKeeper", metadata_path, data_path);
|
||||
break;
|
||||
}
|
||||
case ZNONODE:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of metadata. It's a bug");
|
||||
case ZNOTEMPTY:
|
||||
LOG_ERROR(log, "Metadata was not completely removed from ZooKeeper");
|
||||
break;
|
||||
default:
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
break;
|
||||
}
|
||||
return completely_removed;
|
||||
}
|
||||
|
||||
void StorageKeeperMap::drop()
|
||||
{
|
||||
checkTable<true>();
|
||||
auto client = getClient();
|
||||
|
||||
client->remove(table_path);
|
||||
|
||||
if (!client->getChildren(tables_path).empty())
|
||||
return;
|
||||
|
||||
Coordination::Requests ops;
|
||||
Coordination::Responses responses;
|
||||
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(tables_path, -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(dropped_path, "", zkutil::CreateMode::Persistent));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(dropped_lock_path, "", zkutil::CreateMode::Ephemeral));
|
||||
|
||||
auto code = client->tryMulti(ops, responses);
|
||||
|
||||
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
LOG_INFO(log, "Metadata is being removed by another table");
|
||||
return;
|
||||
}
|
||||
else if (code == Coordination::Error::ZNOTEMPTY)
|
||||
{
|
||||
LOG_WARNING(log, "Another table is using the same path, metadata will not be deleted");
|
||||
return;
|
||||
}
|
||||
else if (code != Coordination::Error::ZOK)
|
||||
zkutil::KeeperMultiException::check(code, ops, responses);
|
||||
|
||||
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
|
||||
dropTable(client, metadata_drop_lock);
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
|
||||
{
|
||||
std::lock_guard lock{zookeeper_mutex};
|
||||
if (!zookeeper_client || zookeeper_client->expired())
|
||||
{
|
||||
zookeeper_client = nullptr;
|
||||
if (zookeeper_name == "default")
|
||||
zookeeper_client = getContext()->getZooKeeper();
|
||||
else
|
||||
zookeeper_client = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
|
||||
zookeeper_client->sync(root_path);
|
||||
}
|
||||
|
||||
return zookeeper_client;
|
||||
}
|
||||
|
||||
const std::string & StorageKeeperMap::dataPath() const
|
||||
{
|
||||
return data_path;
|
||||
}
|
||||
|
||||
std::string StorageKeeperMap::fullPathForKey(const std::string_view key) const
|
||||
{
|
||||
return fs::path(data_path) / key;
|
||||
}
|
||||
|
||||
UInt64 StorageKeeperMap::keysLimit() const
|
||||
{
|
||||
return keys_limit;
|
||||
}
|
||||
|
||||
std::optional<bool> StorageKeeperMap::isTableValid() const
|
||||
{
|
||||
std::lock_guard lock{init_mutex};
|
||||
if (table_is_valid.has_value())
|
||||
return *table_is_valid;
|
||||
|
||||
[&]
|
||||
{
|
||||
try
|
||||
{
|
||||
auto client = getClient();
|
||||
|
||||
std::string stored_metadata_string;
|
||||
Coordination::Stat metadata_stat;
|
||||
client->tryGet(metadata_path, stored_metadata_string, &metadata_stat);
|
||||
|
||||
if (metadata_stat.numChildren == 0)
|
||||
{
|
||||
table_is_valid = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (metadata_string != stored_metadata_string)
|
||||
{
|
||||
LOG_ERROR(
|
||||
log,
|
||||
"Table definition does not match to the one stored in the path {}. Stored definition: {}",
|
||||
root_path,
|
||||
stored_metadata_string);
|
||||
table_is_valid = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// validate all metadata and data nodes are present
|
||||
Coordination::Requests requests;
|
||||
requests.push_back(zkutil::makeCheckRequest(table_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(data_path, -1));
|
||||
requests.push_back(zkutil::makeCheckRequest(dropped_path, -1));
|
||||
|
||||
Coordination::Responses responses;
|
||||
client->tryMulti(requests, responses);
|
||||
|
||||
table_is_valid = false;
|
||||
if (responses[0]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Table node ({}) is missing", table_path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responses[1]->error != Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Data node ({}) is missing", data_path);
|
||||
return;
|
||||
}
|
||||
|
||||
if (responses[2]->error == Coordination::Error::ZOK)
|
||||
{
|
||||
LOG_ERROR(log, "Tables with root node {} are being dropped", root_path);
|
||||
return;
|
||||
}
|
||||
|
||||
table_is_valid = true;
|
||||
}
|
||||
catch (const Coordination::Exception & e)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
|
||||
if (!Coordination::isHardwareError(e.code))
|
||||
table_is_valid = false;
|
||||
}
|
||||
}();
|
||||
|
||||
return table_is_valid;
|
||||
}
|
||||
|
||||
Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const
|
||||
{
|
||||
if (keys.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageKeeperMap supports only one key, got: {}", keys.size());
|
||||
|
||||
auto raw_keys = serializeKeysToRawString(keys[0]);
|
||||
|
||||
if (raw_keys.size() != keys[0].column->size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
|
||||
|
||||
return getBySerializedKeys(raw_keys, &null_map);
|
||||
}
|
||||
|
||||
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
|
||||
{
|
||||
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
|
||||
MutableColumns columns = sample_block.cloneEmptyColumns();
|
||||
size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());
|
||||
|
||||
if (null_map)
|
||||
{
|
||||
null_map->clear();
|
||||
null_map->resize_fill(keys.size(), 1);
|
||||
}
|
||||
|
||||
auto client = getClient();
|
||||
|
||||
std::vector<std::future<Coordination::GetResponse>> values;
|
||||
values.reserve(keys.size());
|
||||
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
const auto full_path = fullPathForKey(key);
|
||||
values.emplace_back(client->asyncTryGet(full_path));
|
||||
}
|
||||
|
||||
auto wait_until = std::chrono::system_clock::now() + std::chrono::milliseconds(Coordination::DEFAULT_OPERATION_TIMEOUT_MS);
|
||||
|
||||
for (size_t i = 0; i < keys.size(); ++i)
|
||||
{
|
||||
auto & value = values[i];
|
||||
if (value.wait_until(wait_until) != std::future_status::ready)
|
||||
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch values: timeout");
|
||||
|
||||
auto response = value.get();
|
||||
Coordination::Error code = response.error;
|
||||
|
||||
if (code == Coordination::Error::ZOK)
|
||||
{
|
||||
fillColumns(base64Decode(keys[i], true), response.data, primary_key_pos, sample_block, columns);
|
||||
}
|
||||
else if (code == Coordination::Error::ZNONODE)
|
||||
{
|
||||
if (null_map)
|
||||
{
|
||||
(*null_map)[i] = 0;
|
||||
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
|
||||
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch value: {}", code);
|
||||
}
|
||||
}
|
||||
|
||||
size_t num_rows = columns.at(0)->size();
|
||||
return Chunk(std::move(columns), num_rows);
|
||||
}
|
||||
|
||||
Block StorageKeeperMap::getSampleBlock(const Names &) const
|
||||
{
|
||||
auto metadata = getInMemoryMetadataPtr();
|
||||
return metadata->getSampleBlock();
|
||||
}
|
||||
|
||||
void StorageKeeperMap::checkTableCanBeRenamed(const StorageID & new_name) const
|
||||
{
|
||||
verifyTableId(new_name);
|
||||
}
|
||||
|
||||
void StorageKeeperMap::rename(const String & /*new_path_to_table_data*/, const StorageID & new_table_id)
|
||||
{
|
||||
checkTableCanBeRenamed(new_table_id);
|
||||
renameInMemory(new_table_id);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
StoragePtr create(const StorageFactory::Arguments & args)
|
||||
{
|
||||
ASTs & engine_args = args.engine_args;
|
||||
if (engine_args.empty() || engine_args.size() > 2)
|
||||
throw Exception(
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Storage KeeperMap requires 1-3 arguments:\n"
|
||||
"root_path: path in the Keeper where the values will be stored (required)\n"
|
||||
"keys_limit: number of keys allowed to be stored, 0 is no limit (default: 0)");
|
||||
|
||||
const auto root_path_node = evaluateConstantExpressionAsLiteral(engine_args[0], args.getLocalContext());
|
||||
auto root_path = checkAndGetLiteralArgument<std::string>(root_path_node, "root_path");
|
||||
|
||||
UInt64 keys_limit = 0;
|
||||
if (engine_args.size() > 1)
|
||||
keys_limit = checkAndGetLiteralArgument<UInt64>(engine_args[1], "keys_limit");
|
||||
|
||||
StorageInMemoryMetadata metadata;
|
||||
metadata.setColumns(args.columns);
|
||||
metadata.setConstraints(args.constraints);
|
||||
|
||||
if (!args.storage_def->primary_key)
|
||||
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.getContext());
|
||||
auto primary_key_names = metadata.getColumnsRequiredForPrimaryKey();
|
||||
if (primary_key_names.size() != 1)
|
||||
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
return std::make_shared<StorageKeeperMap>(
|
||||
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], root_path, keys_limit);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void registerStorageKeeperMap(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
"KeeperMap",
|
||||
create,
|
||||
{
|
||||
.supports_sort_order = true,
|
||||
.supports_parallel_insert = true,
|
||||
});
|
||||
}
|
||||
|
||||
}
|
138
src/Storages/StorageKeeperMap.h
Normal file
138
src/Storages/StorageKeeperMap.h
Normal file
@ -0,0 +1,138 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/IKeyValueEntity.h>
|
||||
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Common/PODArray_fwd.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
|
||||
#include <span>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INVALID_STATE;
|
||||
}
|
||||
|
||||
// KV store using (Zoo|CH)Keeper
|
||||
class StorageKeeperMap final : public IStorage, public IKeyValueEntity, WithContext
|
||||
{
|
||||
public:
|
||||
StorageKeeperMap(
|
||||
ContextPtr context_,
|
||||
const StorageID & table_id,
|
||||
const StorageInMemoryMetadata & metadata,
|
||||
bool attach,
|
||||
std::string_view primary_key_,
|
||||
const std::string & root_path_,
|
||||
UInt64 keys_limit_);
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
|
||||
void drop() override;
|
||||
|
||||
std::string getName() const override { return "KeeperMap"; }
|
||||
Names getPrimaryKey() const override { return {primary_key}; }
|
||||
|
||||
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
|
||||
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const;
|
||||
|
||||
Block getSampleBlock(const Names &) const override;
|
||||
|
||||
void checkTableCanBeRenamed(const StorageID & new_name) const override;
|
||||
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
|
||||
|
||||
bool supportsParallelInsert() const override { return true; }
|
||||
bool supportsIndexForIn() const override { return true; }
|
||||
bool mayBenefitFromIndexForIn(
|
||||
const ASTPtr & node, ContextPtr /*query_context*/, const StorageMetadataPtr & /*metadata_snapshot*/) const override
|
||||
{
|
||||
return node->getColumnName() == primary_key;
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr getClient() const;
|
||||
const std::string & dataPath() const;
|
||||
std::string fullPathForKey(std::string_view key) const;
|
||||
|
||||
UInt64 keysLimit() const;
|
||||
|
||||
template <bool throw_on_error>
|
||||
void checkTable() const
|
||||
{
|
||||
auto is_table_valid = isTableValid();
|
||||
if (!is_table_valid.has_value())
|
||||
{
|
||||
static constexpr std::string_view error_msg = "Failed to activate table because of connection issues. It will be activated "
|
||||
"once a connection is established and metadata is verified";
|
||||
if constexpr (throw_on_error)
|
||||
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
|
||||
else
|
||||
{
|
||||
LOG_ERROR(log, fmt::runtime(error_msg));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (!*is_table_valid)
|
||||
{
|
||||
static constexpr std::string_view error_msg
|
||||
= "Failed to activate table because of invalid metadata in ZooKeeper. Please DETACH table";
|
||||
if constexpr (throw_on_error)
|
||||
throw Exception(ErrorCodes::INVALID_STATE, error_msg);
|
||||
else
|
||||
{
|
||||
LOG_ERROR(log, fmt::runtime(error_msg));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
bool dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock);
|
||||
|
||||
std::optional<bool> isTableValid() const;
|
||||
|
||||
std::string root_path;
|
||||
std::string primary_key;
|
||||
|
||||
std::string data_path;
|
||||
|
||||
std::string metadata_path;
|
||||
|
||||
std::string tables_path;
|
||||
std::string table_path;
|
||||
|
||||
std::string dropped_path;
|
||||
std::string dropped_lock_path;
|
||||
|
||||
std::string zookeeper_name;
|
||||
|
||||
std::string metadata_string;
|
||||
|
||||
uint64_t keys_limit{0};
|
||||
|
||||
mutable std::mutex zookeeper_mutex;
|
||||
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
|
||||
|
||||
mutable std::mutex init_mutex;
|
||||
mutable std::optional<bool> table_is_valid;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -88,6 +88,7 @@ void registerStorageFileLog(StorageFactory & factory);
|
||||
void registerStorageSQLite(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
void registerStorageKeeperMap(StorageFactory & factory);
|
||||
|
||||
void registerStorages()
|
||||
{
|
||||
@ -171,6 +172,8 @@ void registerStorages()
|
||||
#if USE_SQLITE
|
||||
registerStorageSQLite(factory);
|
||||
#endif
|
||||
|
||||
registerStorageKeeperMap(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
3
tests/config/config.d/enable_keeper_map.xml
Normal file
3
tests/config/config.d/enable_keeper_map.xml
Normal file
@ -0,0 +1,3 @@
|
||||
<clickhouse>
|
||||
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
|
||||
</clickhouse>
|
@ -50,6 +50,7 @@ ln -sf $SRC_PATH/config.d/session_log.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/system_unfreeze.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/enable_zero_copy_replication.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/nlp.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/enable_keeper_map.xml $DEST_SERVER_PATH/config.d/
|
||||
|
||||
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
|
||||
|
1
tests/integration/test_keeper_map/__init__.py
Normal file
1
tests/integration/test_keeper_map/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,3 @@
|
||||
<clickhouse>
|
||||
<keeper_map_path_prefix>/test_keeper_map</keeper_map_path_prefix>
|
||||
</clickhouse>
|
179
tests/integration/test_keeper_map/test.py
Normal file
179
tests/integration/test_keeper_map/test.py
Normal file
@ -0,0 +1,179 @@
|
||||
import multiprocessing
|
||||
import pytest
|
||||
from time import sleep
|
||||
import random
|
||||
from itertools import count
|
||||
from sys import stdout
|
||||
|
||||
from multiprocessing import Pool
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain
|
||||
from helpers.network import PartitionManager
|
||||
|
||||
test_recover_staled_replica_run = 1
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/enable_keeper_map.xml"],
|
||||
with_zookeeper=True,
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_genuine_zk():
|
||||
return cluster.get_kazoo_client("zoo1")
|
||||
|
||||
|
||||
def remove_children(client, path):
|
||||
children = client.get_children(path)
|
||||
|
||||
for child in children:
|
||||
child_path = f"{path}/{child}"
|
||||
remove_children(client, child_path)
|
||||
client.delete(child_path)
|
||||
|
||||
|
||||
def test_create_keeper_map(started_cluster):
|
||||
node.query(
|
||||
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
|
||||
)
|
||||
zk_client = get_genuine_zk()
|
||||
|
||||
def assert_children_size(path, expected_size):
|
||||
assert len(zk_client.get_children(path)) == expected_size
|
||||
|
||||
def assert_root_children_size(expected_size):
|
||||
assert_children_size("/test_keeper_map/test1", expected_size)
|
||||
|
||||
def assert_data_children_size(expected_size):
|
||||
assert_children_size("/test_keeper_map/test1/data", expected_size)
|
||||
|
||||
assert_root_children_size(2)
|
||||
assert_data_children_size(0)
|
||||
|
||||
node.query("INSERT INTO test_keeper_map VALUES (1, 11)")
|
||||
assert_data_children_size(1)
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE test_keeper_map_another (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
|
||||
)
|
||||
assert_root_children_size(2)
|
||||
assert_data_children_size(1)
|
||||
|
||||
node.query("INSERT INTO test_keeper_map_another VALUES (1, 11)")
|
||||
assert_root_children_size(2)
|
||||
assert_data_children_size(1)
|
||||
|
||||
node.query("INSERT INTO test_keeper_map_another VALUES (2, 22)")
|
||||
assert_root_children_size(2)
|
||||
assert_data_children_size(2)
|
||||
|
||||
node.query("DROP TABLE test_keeper_map SYNC")
|
||||
assert_root_children_size(2)
|
||||
assert_data_children_size(2)
|
||||
|
||||
node.query("DROP TABLE test_keeper_map_another SYNC")
|
||||
assert_root_children_size(0)
|
||||
|
||||
zk_client.stop()
|
||||
|
||||
|
||||
def create_drop_loop(index, stop_event):
|
||||
table_name = f"test_keeper_map_{index}"
|
||||
|
||||
for i in count(0, 1):
|
||||
if stop_event.is_set():
|
||||
return
|
||||
|
||||
node.query(
|
||||
f"CREATE TABLE {table_name} (key UInt64, value UInt64) ENGINE = KeeperMap('/test') PRIMARY KEY(key);"
|
||||
)
|
||||
node.query(f"INSERT INTO {table_name} VALUES ({index}, {i})")
|
||||
result = node.query(f"SELECT value FROM {table_name} WHERE key = {index}")
|
||||
assert result.strip() == str(i)
|
||||
node.query(f"DROP TABLE {table_name} SYNC")
|
||||
|
||||
|
||||
def test_create_drop_keeper_map_concurrent(started_cluster):
|
||||
pool = Pool()
|
||||
manager = multiprocessing.Manager()
|
||||
stop_event = manager.Event()
|
||||
results = []
|
||||
for i in range(multiprocessing.cpu_count()):
|
||||
sleep(0.2)
|
||||
results.append(
|
||||
pool.apply_async(
|
||||
create_drop_loop,
|
||||
args=(
|
||||
i,
|
||||
stop_event,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
sleep(60)
|
||||
stop_event.set()
|
||||
|
||||
for result in results:
|
||||
result.get()
|
||||
|
||||
pool.close()
|
||||
|
||||
client = get_genuine_zk()
|
||||
assert len(client.get_children("/test_keeper_map/test")) == 0
|
||||
client.stop()
|
||||
|
||||
|
||||
def test_keeper_map_without_zk(started_cluster):
|
||||
def assert_keeper_exception_after_partition(query):
|
||||
with PartitionManager() as pm:
|
||||
pm.drop_instance_zk_connections(node)
|
||||
error = node.query_and_get_error(query)
|
||||
assert "Coordination::Exception" in error
|
||||
|
||||
assert_keeper_exception_after_partition(
|
||||
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
|
||||
)
|
||||
|
||||
node.query(
|
||||
"CREATE TABLE test_keeper_map (key UInt64, value UInt64) ENGINE = KeeperMap('/test1') PRIMARY KEY(key);"
|
||||
)
|
||||
|
||||
assert_keeper_exception_after_partition(
|
||||
"INSERT INTO test_keeper_map VALUES (1, 11)"
|
||||
)
|
||||
node.query("INSERT INTO test_keeper_map VALUES (1, 11)")
|
||||
|
||||
assert_keeper_exception_after_partition("SELECT * FROM test_keeper_map")
|
||||
node.query("SELECT * FROM test_keeper_map")
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.drop_instance_zk_connections(node)
|
||||
node.restart_clickhouse(60)
|
||||
error = node.query_and_get_error("SELECT * FROM test_keeper_map")
|
||||
assert "Failed to activate table because of connection issues" in error
|
||||
|
||||
node.query("SELECT * FROM test_keeper_map")
|
||||
|
||||
client = get_genuine_zk()
|
||||
remove_children(client, "/test_keeper_map/test1")
|
||||
node.restart_clickhouse(60)
|
||||
error = node.query_and_get_error("SELECT * FROM test_keeper_map")
|
||||
assert "Failed to activate table because of invalid metadata in ZooKeeper" in error
|
||||
|
||||
node.query("DETACH TABLE test_keeper_map")
|
||||
|
||||
client.stop()
|
6
tests/queries/0_stateless/02416_keeper_map.reference
Normal file
6
tests/queries/0_stateless/02416_keeper_map.reference
Normal file
@ -0,0 +1,6 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1 1 1 1 1
|
||||
1
|
38
tests/queries/0_stateless/02416_keeper_map.sql
Normal file
38
tests/queries/0_stateless/02416_keeper_map.sql
Normal file
@ -0,0 +1,38 @@
|
||||
-- Tags: no-ordinary-database, no-fasttest, long
|
||||
|
||||
DROP TABLE IF EXISTS 02416_test SYNC;
|
||||
|
||||
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416'); -- { serverError 36 }
|
||||
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key2); -- { serverError 47 }
|
||||
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key, value); -- { serverError 36 }
|
||||
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(concat(key, value)); -- { serverError 36 }
|
||||
CREATE TABLE 02416_test (key Tuple(String, UInt32), value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key);
|
||||
|
||||
DROP TABLE IF EXISTS 02416_test SYNC;
|
||||
CREATE TABLE 02416_test (key String, value UInt32) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(key);
|
||||
|
||||
INSERT INTO 02416_test SELECT '1_1', number FROM numbers(1000);
|
||||
SELECT COUNT(1) == 1 FROM 02416_test;
|
||||
|
||||
INSERT INTO 02416_test SELECT concat(toString(number), '_1'), number FROM numbers(1000);
|
||||
SELECT COUNT(1) == 1000 FROM 02416_test;
|
||||
SELECT uniqExact(key) == 32 FROM (SELECT * FROM 02416_test LIMIT 32 SETTINGS max_block_size = 1);
|
||||
SELECT SUM(value) == 1 + 99 + 900 FROM 02416_test WHERE key IN ('1_1', '99_1', '900_1');
|
||||
|
||||
DROP TABLE IF EXISTS 02416_test SYNC;
|
||||
DROP TABLE IF EXISTS 02416_test_memory;
|
||||
|
||||
CREATE TABLE 02416_test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=KeeperMap('/' || currentDatabase() || '/test2416') PRIMARY KEY(k);
|
||||
CREATE TABLE 02416_test_memory AS 02416_test Engine = Memory;
|
||||
|
||||
INSERT INTO 02416_test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000) group by k;
|
||||
|
||||
INSERT INTO 02416_test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000) group by k;
|
||||
|
||||
SELECT A.a = B.a, A.b = B.b, A.c = B.c, A.d = B.d, A.e = B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02416_test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM 02416_test_memory) B USING a ORDER BY a;
|
||||
|
||||
TRUNCATE TABLE 02416_test;
|
||||
SELECT 0 == COUNT(1) FROM 02416_test;
|
||||
|
||||
DROP TABLE IF EXISTS 02416_test SYNC;
|
||||
DROP TABLE IF EXISTS 02416_test_memory;
|
@ -0,0 +1,10 @@
|
||||
1 11
|
||||
------
|
||||
1 11
|
||||
2 22
|
||||
------
|
||||
1 11
|
||||
2 22
|
||||
------
|
||||
1 11
|
||||
2 22
|
20
tests/queries/0_stateless/02417_keeper_map_create_drop.sql
Normal file
20
tests/queries/0_stateless/02417_keeper_map_create_drop.sql
Normal file
@ -0,0 +1,20 @@
|
||||
-- Tags: no-ordinary-database, no-fasttest
|
||||
|
||||
DROP TABLE IF EXISTS 02417_test SYNC;
|
||||
|
||||
CREATE TABLE 02417_test (key UInt64, value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2417') PRIMARY KEY(key);
|
||||
INSERT INTO 02417_test VALUES (1, 11);
|
||||
SELECT * FROM 02417_test ORDER BY key;
|
||||
SELECT '------';
|
||||
|
||||
CREATE TABLE 02417_test_another (key UInt64, value UInt64) Engine=KeeperMap('/' || currentDatabase() || '/test2417') PRIMARY KEY(key);
|
||||
INSERT INTO 02417_test_another VALUES (2, 22);
|
||||
SELECT * FROM 02417_test_another ORDER BY key;
|
||||
SELECT '------';
|
||||
SELECT * FROM 02417_test ORDER BY key;
|
||||
SELECT '------';
|
||||
|
||||
DROP TABLE 02417_test SYNC;
|
||||
SELECT * FROM 02417_test_another ORDER BY key;
|
||||
|
||||
DROP TABLE 02417_test_another SYNC;
|
@ -0,0 +1,4 @@
|
||||
2
|
||||
3
|
||||
4
|
||||
4
|
23
tests/queries/0_stateless/02418_keeper_map_keys_limit.sql
Normal file
23
tests/queries/0_stateless/02418_keeper_map_keys_limit.sql
Normal file
@ -0,0 +1,23 @@
|
||||
-- Tags: no-ordinary-database, no-fasttest
|
||||
|
||||
DROP TABLE IF EXISTS 02418_test SYNC;
|
||||
|
||||
CREATE TABLE 02418_test (key UInt64, value Float64) Engine=KeeperMap('/' || currentDatabase() || '/test2418', 3) PRIMARY KEY(key);
|
||||
|
||||
INSERT INTO 02418_test VALUES (1, 1.1), (2, 2.2);
|
||||
SELECT count() FROM 02418_test;
|
||||
|
||||
INSERT INTO 02418_test VALUES (3, 3.3), (4, 4.4); -- { serverError 290 }
|
||||
|
||||
INSERT INTO 02418_test VALUES (1, 2.1), (2, 3.2), (3, 3.3);
|
||||
SELECT count() FROM 02418_test;
|
||||
|
||||
CREATE TABLE 02418_test_another (key UInt64, value Float64) Engine=KeeperMap('/' || currentDatabase() || '/test2418', 4) PRIMARY KEY(key);
|
||||
INSERT INTO 02418_test VALUES (4, 4.4); -- { serverError 290 }
|
||||
INSERT INTO 02418_test_another VALUES (4, 4.4);
|
||||
|
||||
SELECT count() FROM 02418_test;
|
||||
SELECT count() FROM 02418_test_another;
|
||||
|
||||
DROP TABLE 02418_test SYNC;
|
||||
DROP TABLE 02418_test_another SYNC;
|
@ -0,0 +1,6 @@
|
||||
1.1
|
||||
2.2
|
||||
1.1
|
||||
2.2
|
||||
1.1
|
||||
2.2
|
23
tests/queries/0_stateless/02419_keeper_map_primary_key.sh
Executable file
23
tests/queries/0_stateless/02419_keeper_map_primary_key.sh
Executable file
@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-ordinary-database, no-fasttest, long
|
||||
|
||||
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS 02419_test SYNC;"
|
||||
|
||||
test_primary_key()
|
||||
{
|
||||
$CLICKHOUSE_CLIENT -nm -q "
|
||||
CREATE TABLE 02419_test (key UInt64, value Float64) Engine=KeeperMap('/' || currentDatabase() || '/test2418', 3) PRIMARY KEY($1);
|
||||
INSERT INTO 02419_test VALUES (1, 1.1), (2, 2.2);
|
||||
SELECT value FROM 02419_test WHERE key = 1;
|
||||
SELECT value FROM 02419_test WHERE key IN (2, 3);
|
||||
DROP TABLE 02419_test SYNC;
|
||||
"
|
||||
}
|
||||
|
||||
test_primary_key "sipHash64(key + 42) * 12212121212121"
|
||||
test_primary_key "reverse(concat(CAST(key, 'String'), 'some string'))"
|
||||
test_primary_key "hex(toFloat32(key))"
|
Loading…
Reference in New Issue
Block a user