ClickHouse/src/Storages/StorageKeeperMap.cpp

707 lines
23 KiB
C++
Raw Normal View History

2022-07-27 13:20:45 +00:00
#include <Storages/StorageKeeperMap.h>
2022-07-26 09:08:55 +00:00
#include <Columns/ColumnString.h>
2022-07-27 13:20:45 +00:00
#include <Core/NamesAndTypes.h>
2022-07-26 09:08:55 +00:00
#include <DataTypes/DataTypeString.h>
2022-07-27 13:20:45 +00:00
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
2022-07-26 09:08:55 +00:00
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
2022-07-27 13:20:45 +00:00
#include <Storages/ColumnsDescription.h>
#include <Storages/KVStorageUtils.h>
2022-07-26 09:08:55 +00:00
#include <Storages/StorageFactory.h>
2022-07-27 13:20:45 +00:00
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/checkAndGetLiteralArgument.h>
2022-08-23 13:15:31 +00:00
#include "Common/Exception.h"
2022-08-30 09:19:59 +00:00
#include "Common/ZooKeeper/IKeeper.h"
2022-07-26 09:08:55 +00:00
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/Types.h>
2022-08-23 13:15:31 +00:00
#include <Common/ZooKeeper/ZooKeeper.h>
2022-07-27 13:20:45 +00:00
#include <Common/ZooKeeper/ZooKeeperConstants.h>
2022-08-24 17:27:07 +00:00
#include "Core/UUID.h"
2022-08-31 08:14:28 +00:00
#include "base/types.h"
2022-07-27 13:20:45 +00:00
#include <boost/algorithm/string/classification.hpp>
#include <Poco/Base64Decoder.h>
#include <Poco/Base64Encoder.h>
#include <Poco/MemoryStream.h>
#include <Poco/StreamCopier.h>
2022-07-26 09:08:55 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-08-03 13:34:14 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int KEEPER_EXCEPTION;
2022-08-03 14:02:14 +00:00
extern const int LOGICAL_ERROR;
2022-08-31 08:14:28 +00:00
extern const int LIMIT_EXCEEDED;
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
namespace
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
std::string base64Encode(const std::string & decoded)
{
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit);
Poco::Base64Encoder encoder(ostr, Poco::BASE64_URL_ENCODING);
encoder.rdbuf()->setLineLength(0);
encoder << decoded;
encoder.close();
return ostr.str();
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
std::string base64Decode(const std::string & encoded)
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
std::string decoded;
Poco::MemoryInputStream istr(encoded.data(), encoded.size());
Poco::Base64Decoder decoder(istr, Poco::BASE64_URL_ENCODING);
Poco::StreamCopier::copyToString(decoder, decoded);
return decoded;
}
2022-08-03 13:34:14 +00:00
constexpr std::string_view default_host = "default";
2022-08-08 09:43:29 +00:00
std::string_view getBaseName(const std::string_view path)
{
auto last_slash = path.find_last_of('/');
if (last_slash == std::string_view::npos)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to get basename of path '{}'", path);
return path.substr(last_slash + 1);
}
2022-07-26 09:08:55 +00:00
}
class StorageKeeperMapSink : public SinkToStorage
{
StorageKeeperMap & storage;
std::unordered_map<std::string, std::string> new_values;
2022-07-27 13:20:45 +00:00
size_t primary_key_pos;
2022-07-26 09:08:55 +00:00
public:
2022-07-27 13:20:45 +00:00
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
primary_key_pos = getHeader().getPositionByName(storage.getPrimaryKey()[0]);
}
2022-07-26 09:08:55 +00:00
std::string getName() const override { return "StorageKeeperMapSink"; }
void consume(Chunk chunk) override
{
2022-07-27 13:20:45 +00:00
auto rows = chunk.getNumRows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
2022-07-26 09:08:55 +00:00
for (size_t i = 0; i < rows; ++i)
{
2022-07-27 13:20:45 +00:00
wb_key.restart();
wb_value.restart();
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
size_t idx = 0;
for (const auto & elem : block)
{
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value);
++idx;
}
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
auto key = base64Encode(wb_key.str());
new_values[std::move(key)] = std::move(wb_value.str());
2022-07-26 09:08:55 +00:00
}
}
void onFinish() override
{
2022-08-31 08:14:28 +00:00
auto zookeeper = storage.getClient();
2022-08-08 09:43:29 +00:00
2022-07-26 09:08:55 +00:00
Coordination::Requests requests;
2022-08-08 09:43:29 +00:00
2022-08-31 08:14:28 +00:00
auto keys_limit = storage.keysLimit();
size_t current_keys_num = 0;
size_t new_keys_num = 0;
if (keys_limit != 0)
{
Coordination::Stat root_stat;
zookeeper->get(storage.rootKeeperPath(), &root_stat);
// exclude metadata node
current_keys_num = root_stat.numChildren - 1;
}
2022-08-31 10:40:32 +00:00
std::vector<std::pair<const std::string *, std::future<Coordination::ExistsResponse>>> exist_responses;
2022-08-23 13:15:31 +00:00
for (const auto & [key, value] : new_values)
2022-08-08 09:43:29 +00:00
{
2022-08-23 13:15:31 +00:00
auto path = storage.fullPathForKey(key);
2022-08-08 09:43:29 +00:00
2022-08-31 10:40:32 +00:00
exist_responses.push_back({&key, zookeeper->asyncExists(path)});
}
for (auto & [key, response] : exist_responses)
{
if (response.get().error == Coordination::Error::ZOK)
2022-08-31 08:14:28 +00:00
{
2022-08-31 10:40:32 +00:00
requests.push_back(zkutil::makeSetRequest(storage.fullPathForKey(*key), new_values[*key], -1));
2022-08-31 08:14:28 +00:00
}
2022-08-23 13:15:31 +00:00
else
2022-08-31 08:14:28 +00:00
{
2022-08-31 10:40:32 +00:00
requests.push_back(zkutil::makeCreateRequest(storage.fullPathForKey(*key), new_values[*key], zkutil::CreateMode::Persistent));
2022-08-31 08:14:28 +00:00
++new_keys_num;
}
}
if (new_keys_num != 0)
{
auto will_be = current_keys_num + new_keys_num;
if (keys_limit != 0 && will_be > keys_limit)
throw Exception(
ErrorCodes::LIMIT_EXCEEDED,
"Limit would be exceeded by inserting {} new key(s). Limit is {}, while the number of keys would be {}",
new_keys_num,
keys_limit,
will_be);
2022-08-08 09:43:29 +00:00
}
2022-07-26 09:08:55 +00:00
zookeeper->multi(requests);
}
};
2022-07-27 13:20:45 +00:00
template <typename KeyContainer>
2022-07-26 09:08:55 +00:00
class StorageKeeperMapSource : public ISource
{
2022-07-27 13:20:45 +00:00
const StorageKeeperMap & storage;
2022-07-26 09:08:55 +00:00
size_t max_block_size;
2022-07-27 13:20:45 +00:00
using KeyContainerPtr = std::shared_ptr<KeyContainer>;
KeyContainerPtr container;
using KeyContainerIter = typename KeyContainer::const_iterator;
KeyContainerIter it;
KeyContainerIter end;
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
public:
StorageKeeperMapSource(
const StorageKeeperMap & storage_,
const Block & header,
size_t max_block_size_,
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_)
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
{
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
std::string getName() const override { return "StorageKeeperMapSource"; }
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
Chunk generate() override
{
if (it >= end)
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
it = {};
return {};
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
using KeyType = typename KeyContainer::value_type;
if constexpr (std::same_as<KeyType, Field>)
{
const auto & sample_block = getPort().getHeader();
const auto & key_column_type = sample_block.getByName(storage.getPrimaryKey().at(0)).type;
auto raw_keys = serializeKeysToRawString(it, end, key_column_type, max_block_size);
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key);
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
return storage.getBySerializedKeys(raw_keys, nullptr);
}
else
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
it += elem_num;
return chunk;
2022-07-26 09:08:55 +00:00
}
}
2022-07-27 13:20:45 +00:00
};
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
StorageKeeperMap::StorageKeeperMap(
2022-08-23 13:15:31 +00:00
ContextPtr context_,
2022-07-27 13:20:45 +00:00
const StorageID & table_id,
const StorageInMemoryMetadata & metadata,
bool attach,
2022-07-27 13:20:45 +00:00
std::string_view primary_key_,
2022-08-23 13:15:31 +00:00
const std::string & root_path_,
2022-08-31 08:14:28 +00:00
bool create_missing_root_path,
UInt64 keys_limit_)
2022-08-23 13:15:31 +00:00
: IStorage(table_id)
, WithContext(context_->getGlobalContext())
, root_path(zkutil::extractZooKeeperPath(root_path_, false))
, primary_key(primary_key_)
, zookeeper_name(zkutil::extractZooKeeperName(root_path_))
2022-08-31 08:14:28 +00:00
, keys_limit(keys_limit_)
2022-08-23 13:15:31 +00:00
, log(&Poco::Logger::get("StorageKeeperMap"))
2022-07-27 13:20:45 +00:00
{
2022-08-24 17:27:07 +00:00
if (table_id.uuid == UUIDHelpers::Nil)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KeeperMap cannot be used with '{}' database because UUID is needed. Please use Atomic or Replicated database", table_id.getDatabaseName());
2022-07-27 13:20:45 +00:00
setInMemoryMetadata(metadata);
2022-07-26 09:08:55 +00:00
2022-08-10 08:52:36 +00:00
if (root_path.empty())
throw Exception("root_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
if (!root_path.starts_with('/'))
throw Exception("root_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
2022-07-26 09:08:55 +00:00
2022-08-31 08:14:28 +00:00
auto config_keys_limit = context_->getConfigRef().getUInt64("keeper_map_keys_limit", 0);
if (config_keys_limit != 0 && keys_limit > config_keys_limit)
{
LOG_WARNING(
log,
"Keys limit for {} defined by argument ({}) is larger than the one defined by 'keeper_map_keys_limit' config ({}). Will use "
"config defined value",
getStorageID().getFullTableName(),
keys_limit,
config_keys_limit);
keys_limit = config_keys_limit;
}
else if (keys_limit > 0)
{
LOG_INFO(log, "Keys limit for {} will be set to {}", getStorageID().getFullTableName(), keys_limit);
}
2022-08-10 08:52:36 +00:00
std::filesystem::path root_path_fs{root_path};
2022-08-23 13:15:31 +00:00
auto metadata_path_fs = root_path_fs / "ch_metadata";
2022-08-10 08:52:36 +00:00
metadata_path = metadata_path_fs;
2022-08-23 13:15:31 +00:00
tables_path = metadata_path_fs / "tables";
2022-08-24 17:27:07 +00:00
table_path = fs::path(tables_path) / toString(table_id.uuid);
2022-08-23 13:15:31 +00:00
dropped_path = metadata_path_fs / "dropped";
2022-08-24 17:27:07 +00:00
dropped_lock_path = fs::path(dropped_path) / "lock";
2022-08-10 08:52:36 +00:00
if (attach)
{
2022-08-23 13:15:31 +00:00
checkTable<false>();
return;
}
2022-08-23 13:15:31 +00:00
auto client = getClient();
2022-08-10 08:52:36 +00:00
if (root_path != "/" && !client->exists(root_path))
2022-07-27 13:20:45 +00:00
{
2022-08-03 13:34:14 +00:00
if (!create_missing_root_path)
2022-07-26 09:08:55 +00:00
{
2022-08-03 13:34:14 +00:00
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path '{}' doesn't exist. Please create it or set 'create_missing_root_path' to true'",
2022-08-10 08:52:36 +00:00
root_path_);
2022-08-03 13:34:14 +00:00
}
else
{
2022-08-10 08:52:36 +00:00
LOG_TRACE(log, "Creating root path {}", root_path);
2022-08-24 17:27:07 +00:00
client->createAncestors(root_path);
client->createIfNotExists(root_path, "");
2022-08-03 13:34:14 +00:00
}
2022-07-26 09:08:55 +00:00
}
2022-08-08 09:43:29 +00:00
2022-08-24 17:27:07 +00:00
for (size_t i = 0; i < 1000; ++i)
2022-08-23 13:15:31 +00:00
{
2022-08-24 17:27:07 +00:00
if (client->exists(dropped_path))
{
LOG_INFO(log, "Removing leftover nodes");
auto code = client->tryCreate(dropped_lock_path, "", zkutil::CreateMode::Ephemeral);
2022-08-24 17:27:07 +00:00
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Someone else removed leftovers");
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception(code, dropped_lock_path);
}
else
{
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
2022-08-31 08:23:44 +00:00
if (!dropTable(client, metadata_drop_lock))
2022-08-24 17:27:07 +00:00
continue;
}
}
2022-08-08 09:43:29 +00:00
2022-08-24 17:27:07 +00:00
client->createIfNotExists(metadata_path, "");
client->createIfNotExists(tables_path, "");
2022-08-23 13:15:31 +00:00
2022-08-24 17:27:07 +00:00
auto code = client->tryCreate(table_path, "", zkutil::CreateMode::Persistent);
2022-08-23 13:15:31 +00:00
2022-08-24 17:27:07 +00:00
if (code == Coordination::Error::ZOK)
2022-08-30 13:41:13 +00:00
{
table_is_valid = true;
2022-08-24 17:27:07 +00:00
return;
2022-08-30 13:41:13 +00:00
}
2022-08-23 13:15:31 +00:00
2022-08-24 17:27:07 +00:00
if (code == Coordination::Error::ZNONODE)
LOG_INFO(log, "Metadata nodes were deleted in background, will retry");
2022-08-30 13:50:02 +00:00
else
2022-08-24 17:27:07 +00:00
throw Coordination::Exception(code, table_path);
2022-08-23 13:15:31 +00:00
}
2022-08-30 13:50:02 +00:00
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot create metadata for table, because it is removed concurrently or because of wrong root_path ({})", root_path);
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
2022-07-26 09:08:55 +00:00
Pipe StorageKeeperMap::read(
const Names & column_names,
2022-07-27 13:20:45 +00:00
const StorageSnapshotPtr & storage_snapshot,
2022-07-26 09:08:55 +00:00
SelectQueryInfo & query_info,
2022-08-23 13:15:31 +00:00
ContextPtr context_,
2022-07-26 09:08:55 +00:00
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
2022-07-27 13:20:45 +00:00
unsigned num_streams)
2022-07-26 09:08:55 +00:00
{
2022-08-23 13:15:31 +00:00
checkTable<true>();
2022-07-27 13:20:45 +00:00
storage_snapshot->check(column_names);
FieldVectorPtr filtered_keys;
bool all_scan;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
auto primary_key_type = sample_block.getByName(primary_key).type;
2022-08-23 13:15:31 +00:00
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);
2022-07-27 13:20:45 +00:00
const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
if (keys->empty())
return {};
::sort(keys->begin(), keys->end());
keys->erase(std::unique(keys->begin(), keys->end()), keys->end());
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
Pipes pipes;
2022-07-26 09:08:55 +00:00
2022-07-27 13:20:45 +00:00
size_t num_keys = keys->size();
size_t num_threads = std::min<size_t>(num_streams, keys->size());
assert(num_keys <= std::numeric_limits<uint32_t>::max());
assert(num_threads <= std::numeric_limits<uint32_t>::max());
for (size_t thread_idx = 0; thread_idx < num_threads; ++thread_idx)
{
size_t begin = num_keys * thread_idx / num_threads;
size_t end = num_keys * (thread_idx + 1) / num_threads;
using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
}
return Pipe::unitePipes(std::move(pipes));
};
2022-08-31 08:14:28 +00:00
auto client = getClient();
2022-07-27 13:20:45 +00:00
if (all_scan)
2022-08-10 08:52:36 +00:00
return process_keys(std::make_shared<std::vector<std::string>>(client->getChildren(root_path)));
2022-07-27 13:20:45 +00:00
return process_keys(std::move(filtered_keys));
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
2022-07-26 09:08:55 +00:00
{
2022-08-23 13:15:31 +00:00
checkTable<true>();
2022-07-27 13:20:45 +00:00
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
2022-07-26 09:08:55 +00:00
}
2022-08-23 13:15:31 +00:00
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
2022-08-23 13:15:31 +00:00
checkTable<true>();
auto client = getClient();
2022-08-30 09:19:59 +00:00
client->tryRemoveChildrenRecursive(root_path, true, getBaseName(metadata_path));
}
2022-08-31 08:23:44 +00:00
bool StorageKeeperMap::dropTable(zkutil::ZooKeeperPtr zookeeper, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock)
2022-08-24 17:27:07 +00:00
{
2022-08-31 08:23:44 +00:00
zookeeper->removeChildrenRecursive(root_path, getBaseName(metadata_path));
2022-08-24 17:27:07 +00:00
bool completely_removed = false;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1));
ops.emplace_back(zkutil::makeRemoveRequest(dropped_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(metadata_path, -1));
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
using enum Coordination::Error;
switch (code)
{
case ZOK:
{
metadata_drop_lock->setAlreadyRemoved();
completely_removed = true;
LOG_INFO(log, "Metadata in {} was successfully removed from ZooKeeper", metadata_path);
break;
}
case ZNONODE:
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of metadata. It's a bug");
case ZNOTEMPTY:
LOG_ERROR(log, "Metadata was not completely removed from ZooKeeper");
break;
default:
zkutil::KeeperMultiException::check(code, ops, responses);
2022-08-30 09:19:59 +00:00
break;
2022-08-24 17:27:07 +00:00
}
return completely_removed;
}
void StorageKeeperMap::drop()
{
2022-08-23 13:15:31 +00:00
checkTable<true>();
auto client = getClient();
2022-08-23 13:15:31 +00:00
2022-08-30 09:19:59 +00:00
client->remove(table_path);
if (!client->getChildren(tables_path).empty())
return;
Coordination::Requests ops;
Coordination::Responses responses;
2022-08-23 13:15:31 +00:00
2022-08-30 09:19:59 +00:00
ops.emplace_back(zkutil::makeRemoveRequest(tables_path, -1));
ops.emplace_back(zkutil::makeCreateRequest(dropped_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(dropped_lock_path, "", zkutil::CreateMode::Ephemeral));
auto code = client->tryMulti(ops, responses);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
2022-08-23 13:15:31 +00:00
{
2022-08-30 09:19:59 +00:00
LOG_INFO(log, "Metadata is being removed by another table");
2022-08-23 13:15:31 +00:00
return;
}
2022-08-30 09:19:59 +00:00
else if (code == Coordination::Error::ZNOTEMPTY)
{
LOG_WARNING(log, "Another table is using the same path, metadata will not be deleted");
return;
}
else if (code != Coordination::Error::ZOK)
zkutil::KeeperMultiException::check(code, ops, responses);
2022-08-23 13:15:31 +00:00
2022-08-30 09:19:59 +00:00
auto metadata_drop_lock = zkutil::EphemeralNodeHolder::existing(dropped_lock_path, *client);
2022-08-31 08:23:44 +00:00
dropTable(client, metadata_drop_lock);
}
2022-08-31 08:14:28 +00:00
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
2022-07-26 09:08:55 +00:00
{
2022-08-23 13:15:31 +00:00
std::lock_guard lock{zookeeper_mutex};
if (!zookeeper_client || zookeeper_client->expired())
2022-08-03 13:34:14 +00:00
{
2022-08-23 13:15:31 +00:00
zookeeper_client = nullptr;
if (zookeeper_name == "default")
zookeeper_client = getContext()->getZooKeeper();
else
zookeeper_client = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
2022-08-31 08:29:40 +00:00
zookeeper_client->sync(rootKeeperPath());
2022-08-03 13:34:14 +00:00
}
2022-07-26 09:08:55 +00:00
return zookeeper_client;
}
const std::string & StorageKeeperMap::rootKeeperPath() const
{
2022-08-10 08:52:36 +00:00
return root_path;
2022-07-26 09:08:55 +00:00
}
2022-07-27 13:20:45 +00:00
std::string StorageKeeperMap::fullPathForKey(const std::string_view key) const
{
2022-08-10 08:52:36 +00:00
return fmt::format("{}/{}", root_path, key);
2022-07-27 13:20:45 +00:00
}
2022-08-31 08:14:28 +00:00
UInt64 StorageKeeperMap::keysLimit() const
{
return keys_limit;
}
2022-08-23 13:15:31 +00:00
std::optional<bool> StorageKeeperMap::isTableValid() const
2022-08-08 09:43:29 +00:00
{
2022-08-23 13:15:31 +00:00
std::lock_guard lock{init_mutex};
if (table_is_valid.has_value())
return *table_is_valid;
try
{
// validate all metadata nodes are present
Coordination::Requests requests;
2022-08-30 09:19:59 +00:00
requests.push_back(zkutil::makeCheckRequest(table_path, -1));
2022-08-23 13:15:31 +00:00
Coordination::Responses responses;
auto client = getClient();
auto res = client->tryMulti(requests, responses);
table_is_valid = res == Coordination::Error::ZOK;
}
catch (const Coordination::Exception & e)
{
tryLogCurrentException(log);
2022-08-30 09:19:59 +00:00
if (!Coordination::isHardwareError(e.code))
2022-08-23 13:15:31 +00:00
table_is_valid = false;
}
catch (const Exception &)
{
tryLogCurrentException(log);
table_is_valid = false;
}
return table_is_valid;
2022-08-08 09:43:29 +00:00
}
Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const
2022-07-27 13:20:45 +00:00
{
if (keys.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "StorageKeeperMap supports only one key, got: {}", keys.size());
auto raw_keys = serializeKeysToRawString(keys[0]);
if (raw_keys.size() != keys[0].column->size())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
return getBySerializedKeys(raw_keys, &null_map);
}
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());
if (null_map)
{
null_map->clear();
null_map->resize_fill(keys.size(), 1);
}
auto client = getClient();
std::vector<std::future<Coordination::GetResponse>> values;
values.reserve(keys.size());
for (const auto & key : keys)
{
2022-08-08 09:43:29 +00:00
const auto full_path = fullPathForKey(key);
if (full_path == metadata_path)
{
values.emplace_back();
continue;
}
values.emplace_back(client->asyncTryGet(full_path));
2022-07-27 13:20:45 +00:00
}
auto wait_until = std::chrono::system_clock::now() + std::chrono::milliseconds(Coordination::DEFAULT_OPERATION_TIMEOUT_MS);
for (size_t i = 0; i < keys.size(); ++i)
{
auto & value = values[i];
2022-08-08 09:43:29 +00:00
if (!value.valid())
continue;
2022-07-27 13:20:45 +00:00
if (value.wait_until(wait_until) != std::future_status::ready)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Failed to fetch values: timeout");
auto response = value.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK)
{
fillColumns(base64Decode(keys[i]), response.data, primary_key_pos, sample_block, columns);
}
else if (code == Coordination::Error::ZNONODE)
{
if (null_map)
{
(*null_map)[i] = 0;
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());
}
}
else
{
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch value: {}", code);
}
}
size_t num_rows = columns.at(0)->size();
return Chunk(std::move(columns), num_rows);
}
Block StorageKeeperMap::getSampleBlock(const Names &) const
{
auto metadata = getInMemoryMetadataPtr();
return metadata ? metadata->getSampleBlock() : Block();
}
2022-07-27 13:20:45 +00:00
namespace
{
2022-08-23 13:15:31 +00:00
2022-08-31 08:14:28 +00:00
StoragePtr create(const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.empty() || engine_args.size() > 3)
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage KeeperMap requires 1-2 arguments:\n"
"root_path: path in the Keeper where the values will be stored (required)\n"
"create_missing_root_path: 1 if the root path should be created if it's missing, otherwise throw exception (default: 1)\n",
"keys_limit: number of keys allowed to be stored, 0 is no limit (default: 0)\n",
default_host);
auto root_path = checkAndGetLiteralArgument<std::string>(engine_args[0], "root_path");
bool create_missing_root_path = true;
if (engine_args.size() > 1)
create_missing_root_path = checkAndGetLiteralArgument<UInt64>(engine_args[1], "create_missing_root_path");
UInt64 keys_limit = 0;
if (engine_args.size() > 2)
keys_limit = checkAndGetLiteralArgument<UInt64>(engine_args[2], "keys_limit");
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
metadata.setConstraints(args.constraints);
if (!args.storage_def->primary_key)
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.getContext());
auto primary_key_names = metadata.getColumnsRequiredForPrimaryKey();
if (primary_key_names.size() != 1)
throw Exception("StorageKeeperMap requires one column in primary key", ErrorCodes::BAD_ARGUMENTS);
return std::make_shared<StorageKeeperMap>(
args.getContext(), args.table_id, metadata, args.query.attach, primary_key_names[0], root_path, create_missing_root_path, keys_limit);
}
2022-08-23 13:15:31 +00:00
2022-07-27 13:20:45 +00:00
}
2022-07-26 09:08:55 +00:00
void registerStorageKeeperMap(StorageFactory & factory)
{
factory.registerStorage(
"KeeperMap",
2022-07-27 13:20:45 +00:00
create,
2022-07-26 09:08:55 +00:00
{
2022-07-27 13:20:45 +00:00
.supports_sort_order = true,
.supports_parallel_insert = true,
});
2022-07-26 09:08:55 +00:00
}
}