mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 17:32:32 +00:00
Initial implementation of KeeperMap
This commit is contained in:
parent
0921548a37
commit
7ddadd25a3
@ -720,7 +720,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
|
||||
}
|
||||
/// We can have queries like "CREATE TABLE <table> ENGINE=<engine>" if <engine>
|
||||
/// supports schema inference (will determine table structure in it's constructor).
|
||||
else if (!StorageFactory::instance().checkIfStorageSupportsSchemaInterface(create.storage->engine->name)) // NOLINT
|
||||
else if (!StorageFactory::instance().checkIfStorageSupportsSchemaInterface(create.storage->engine->name) && create.storage->engine->name != "KeeperMap") // NOLINT
|
||||
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
|
||||
|
175
src/Storages/KVStorageUtils.cpp
Normal file
175
src/Storages/KVStorageUtils.cpp
Normal file
@ -0,0 +1,175 @@
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
// returns keys may be filter by condition
|
||||
bool traverseASTFilter(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, const ContextPtr & context, FieldVectorPtr & res)
|
||||
{
|
||||
const auto * function = elem->as<ASTFunction>();
|
||||
if (!function)
|
||||
return false;
|
||||
|
||||
if (function->name == "and")
|
||||
{
|
||||
// one child has the key filter condition is ok
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (traverseASTFilter(primary_key, primary_key_type, child, sets, context, res))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (function->name == "or")
|
||||
{
|
||||
// make sure every child has the key filter condition
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, context, res))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else if (function->name == "equals" || function->name == "in")
|
||||
{
|
||||
const auto & args = function->arguments->as<ASTExpressionList &>();
|
||||
const ASTIdentifier * ident;
|
||||
std::shared_ptr<IAST> value;
|
||||
|
||||
if (args.children.size() != 2)
|
||||
return false;
|
||||
|
||||
if (function->name == "in")
|
||||
{
|
||||
ident = args.children.at(0)->as<ASTIdentifier>();
|
||||
if (!ident)
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
value = args.children.at(1);
|
||||
|
||||
PreparedSetKey set_key;
|
||||
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
|
||||
set_key = PreparedSetKey::forSubquery(*value);
|
||||
else
|
||||
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
|
||||
|
||||
auto set_it = sets.find(set_key);
|
||||
if (set_it == sets.end())
|
||||
return false;
|
||||
SetPtr prepared_set = set_it->second;
|
||||
|
||||
if (!prepared_set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
prepared_set->checkColumnsNumber(1);
|
||||
const auto & set_column = *prepared_set->getSetElements()[0];
|
||||
for (size_t row = 0; row < set_column.size(); ++row)
|
||||
res->push_back(set_column[row]);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
|
||||
value = args.children.at(1);
|
||||
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
|
||||
value = args.children.at(0);
|
||||
else
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
|
||||
const auto node = evaluateConstantExpressionAsLiteral(value, context);
|
||||
/// function->name == "equals"
|
||||
if (const auto * literal = node->as<ASTLiteral>())
|
||||
{
|
||||
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
|
||||
if (!converted_field.isNull())
|
||||
res->push_back(converted_field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context)
|
||||
{
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
if (!select.where())
|
||||
return {{}, true};
|
||||
|
||||
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, context, res);
|
||||
return std::make_pair(res, !matched_keys);
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size)
|
||||
{
|
||||
size_t num_keys = end - it;
|
||||
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
size_t rows_processed = 0;
|
||||
while (it < end && (max_block_size == 0 || rows_processed < max_block_size))
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
key_column_type->getDefaultSerialization()->serializeBinary(*it, wb);
|
||||
wb.finalize();
|
||||
|
||||
++it;
|
||||
++rows_processed;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys)
|
||||
{
|
||||
if (!keys.column)
|
||||
return {};
|
||||
|
||||
size_t num_keys = keys.column->size();
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i)
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
Field field;
|
||||
keys.column->get(i, field);
|
||||
/// TODO(@vdimir): use serializeBinaryBulk
|
||||
keys.type->getDefaultSerialization()->serializeBinary(field, wb);
|
||||
wb.finalize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// In current implementation rocks db can have key with only one column.
|
||||
size_t getPrimaryKeyPos(const Block & header, const Names & primary_key)
|
||||
{
|
||||
if (primary_key.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB: only one primary key is supported");
|
||||
return header.getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
}
|
47
src/Storages/KVStorageUtils.h
Normal file
47
src/Storages/KVStorageUtils.h
Normal file
@ -0,0 +1,47 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using FieldVectorPtr = std::shared_ptr<FieldVector>;
|
||||
|
||||
class IDataType;
|
||||
using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
|
||||
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
|
||||
* TODO support key like search
|
||||
*/
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context);
|
||||
|
||||
template <typename K, typename V>
|
||||
void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||
{
|
||||
ReadBufferFromString key_buffer(key);
|
||||
ReadBufferFromString value_buffer(value);
|
||||
for (size_t i = 0; i < header.columns(); ++i)
|
||||
{
|
||||
const auto & serialization = header.getByPosition(i).type->getDefaultSerialization();
|
||||
serialization->deserializeBinary(*columns[i], i == key_pos ? key_buffer : value_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size);
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys);
|
||||
|
||||
/// In current implementation key with only column is supported.
|
||||
size_t getPrimaryKeyPos(const Block & header, const Names & primary_key);
|
||||
|
||||
}
|
@ -3,30 +3,17 @@
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Processors/ISource.h>
|
||||
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
@ -75,177 +62,6 @@ static RocksDBOptions getOptionsFromConfig(const Poco::Util::AbstractConfigurati
|
||||
return options;
|
||||
}
|
||||
|
||||
// returns keys may be filter by condition
|
||||
static bool traverseASTFilter(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSets & sets, const ContextPtr & context, FieldVectorPtr & res)
|
||||
{
|
||||
const auto * function = elem->as<ASTFunction>();
|
||||
if (!function)
|
||||
return false;
|
||||
|
||||
if (function->name == "and")
|
||||
{
|
||||
// one child has the key filter condition is ok
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (traverseASTFilter(primary_key, primary_key_type, child, sets, context, res))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (function->name == "or")
|
||||
{
|
||||
// make sure every child has the key filter condition
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (!traverseASTFilter(primary_key, primary_key_type, child, sets, context, res))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else if (function->name == "equals" || function->name == "in")
|
||||
{
|
||||
const auto & args = function->arguments->as<ASTExpressionList &>();
|
||||
const ASTIdentifier * ident;
|
||||
std::shared_ptr<IAST> value;
|
||||
|
||||
if (args.children.size() != 2)
|
||||
return false;
|
||||
|
||||
if (function->name == "in")
|
||||
{
|
||||
ident = args.children.at(0)->as<ASTIdentifier>();
|
||||
if (!ident)
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
value = args.children.at(1);
|
||||
|
||||
PreparedSetKey set_key;
|
||||
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
|
||||
set_key = PreparedSetKey::forSubquery(*value);
|
||||
else
|
||||
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
|
||||
|
||||
auto set_it = sets.find(set_key);
|
||||
if (set_it == sets.end())
|
||||
return false;
|
||||
SetPtr prepared_set = set_it->second;
|
||||
|
||||
if (!prepared_set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
prepared_set->checkColumnsNumber(1);
|
||||
const auto & set_column = *prepared_set->getSetElements()[0];
|
||||
for (size_t row = 0; row < set_column.size(); ++row)
|
||||
res->push_back(set_column[row]);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
|
||||
value = args.children.at(1);
|
||||
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
|
||||
value = args.children.at(0);
|
||||
else
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
|
||||
const auto node = evaluateConstantExpressionAsLiteral(value, context);
|
||||
/// function->name == "equals"
|
||||
if (const auto * literal = node->as<ASTLiteral>())
|
||||
{
|
||||
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
|
||||
if (!converted_field.isNull())
|
||||
res->push_back(converted_field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
|
||||
* TODO support key like search
|
||||
*/
|
||||
static std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context)
|
||||
{
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
if (!select.where())
|
||||
return {{}, true};
|
||||
|
||||
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.sets, context, res);
|
||||
return std::make_pair(res, !matched_keys);
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
static void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||
{
|
||||
ReadBufferFromString key_buffer(key);
|
||||
ReadBufferFromString value_buffer(value);
|
||||
for (size_t i = 0; i < header.columns(); ++i)
|
||||
{
|
||||
const auto & serialization = header.getByPosition(i).type->getDefaultSerialization();
|
||||
serialization->deserializeBinary(*columns[i], i == key_pos ? key_buffer : value_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
static std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size)
|
||||
{
|
||||
size_t num_keys = end - it;
|
||||
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
size_t rows_processed = 0;
|
||||
while (it < end && (max_block_size == 0 || rows_processed < max_block_size))
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
key_column_type->getDefaultSerialization()->serializeBinary(*it, wb);
|
||||
wb.finalize();
|
||||
|
||||
++it;
|
||||
++rows_processed;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys)
|
||||
{
|
||||
if (!keys.column)
|
||||
return {};
|
||||
|
||||
size_t num_keys = keys.column->size();
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i)
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
Field field;
|
||||
keys.column->get(i, field);
|
||||
/// TODO(@vdimir): use serializeBinaryBulk
|
||||
keys.type->getDefaultSerialization()->serializeBinary(field, wb);
|
||||
wb.finalize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// In current implementation rocks db can have key with only one column.
|
||||
static size_t getPrimaryKeyPos(const Block & header, const Names & primary_key)
|
||||
{
|
||||
if (primary_key.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB: only one primary key is supported");
|
||||
return header.getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
class EmbeddedRocksDBSource : public ISource
|
||||
{
|
||||
public:
|
||||
|
329
src/Storages/StorageKeeperMap.cpp
Normal file
329
src/Storages/StorageKeeperMap.cpp
Normal file
@ -0,0 +1,329 @@
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Processors/Sinks/SinkToStorage.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageKeeperMap.h>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include "Parsers/ASTExpressionList.h"
|
||||
#include "Parsers/ASTFunction.h"
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include "Parsers/ASTSelectQuery.h"
|
||||
#include "Storages/MergeTree/IMergeTreeDataPart.h"
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
StorageKeeperMap::StorageKeeperMap(std::string_view keeper_path_, ContextPtr context, const StorageID & table_id)
|
||||
: IStorage(table_id), keeper_path(keeper_path_), zookeeper_client(context->getZooKeeper()->startNewSession())
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(ColumnsDescription{getNamesAndTypes()});
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
if (keeper_path.empty())
|
||||
throw Exception("keeper_path should not be empty", ErrorCodes::BAD_ARGUMENTS);
|
||||
if (!keeper_path.starts_with('/'))
|
||||
throw Exception("keeper_path should start with '/'", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (keeper_path != "/")
|
||||
{
|
||||
LOG_TRACE(&Poco::Logger::get("StorageKeeperMap"), "Creating root path {}", keeper_path);
|
||||
|
||||
size_t cur_pos = 0;
|
||||
do
|
||||
{
|
||||
size_t search_start = cur_pos + 1;
|
||||
cur_pos = keeper_path.find('/', search_start);
|
||||
if (search_start == cur_pos)
|
||||
throw Exception("keeper_path is invalid, contains subsequent '/'", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto path = keeper_path.substr(0, cur_pos);
|
||||
auto status = getClient()->tryCreate(path, "", zkutil::CreateMode::Persistent);
|
||||
if (status != Coordination::Error::ZOK && status != Coordination::Error::ZNODEEXISTS)
|
||||
throw zkutil::KeeperException(status, path);
|
||||
} while (cur_pos != std::string_view::npos);
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList StorageKeeperMap::getNamesAndTypes()
|
||||
{
|
||||
return {{"key", std::make_shared<DataTypeString>()}, {"value", std::make_shared<DataTypeString>()}};
|
||||
}
|
||||
|
||||
class StorageKeeperMapSink : public SinkToStorage
|
||||
{
|
||||
StorageKeeperMap & storage;
|
||||
std::unordered_map<std::string, std::string> new_values;
|
||||
|
||||
public:
|
||||
StorageKeeperMapSink(const Block & header, StorageKeeperMap & storage_) : SinkToStorage(header), storage(storage_) { }
|
||||
|
||||
std::string getName() const override { return "StorageKeeperMapSink"; }
|
||||
|
||||
void consume(Chunk chunk) override
|
||||
{
|
||||
auto block = getHeader().cloneWithColumns(chunk.getColumns());
|
||||
|
||||
size_t rows = block.rows();
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
std::string key = block.getByPosition(0).column->getDataAt(i).toString();
|
||||
|
||||
if (key.find('/') != std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Key cannot contain '/'. Key: '{}'", key);
|
||||
|
||||
std::string value = block.getByPosition(1).column->getDataAt(i).toString();
|
||||
|
||||
new_values[std::move(key)] = std::move(value);
|
||||
}
|
||||
}
|
||||
|
||||
void onFinish() override
|
||||
{
|
||||
auto & zookeeper = storage.getClient();
|
||||
Coordination::Requests requests;
|
||||
for (const auto & [key, value] : new_values)
|
||||
{
|
||||
auto path = fmt::format("{}/{}", storage.rootKeeperPath(), key);
|
||||
|
||||
if (zookeeper->exists(path))
|
||||
requests.push_back(zkutil::makeSetRequest(path, value, -1));
|
||||
else
|
||||
requests.push_back(zkutil::makeCreateRequest(path, value, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
|
||||
zookeeper->multi(requests);
|
||||
}
|
||||
};
|
||||
|
||||
enum class FilterType
|
||||
{
|
||||
EXACT = 0
|
||||
};
|
||||
|
||||
struct KeyFilter
|
||||
{
|
||||
std::string filter;
|
||||
FilterType type;
|
||||
};
|
||||
|
||||
class StorageKeeperMapSource : public ISource
|
||||
{
|
||||
StorageKeeperMap & storage;
|
||||
std::vector<std::string> keys;
|
||||
size_t current_idx = 0;
|
||||
Block sample_block;
|
||||
Names column_names;
|
||||
bool has_value_column{false};
|
||||
size_t max_block_size;
|
||||
std::optional<KeyFilter> filter;
|
||||
|
||||
Chunk generateSingleKey()
|
||||
{
|
||||
assert(filter && filter->type == FilterType::EXACT);
|
||||
static bool processed = false;
|
||||
|
||||
if (processed)
|
||||
return {};
|
||||
|
||||
auto zookeeper = storage.getClient();
|
||||
|
||||
std::string value;
|
||||
auto path = fmt::format("{}/{}", storage.rootKeeperPath(), filter->filter);
|
||||
auto res = zookeeper->tryGet(path, value);
|
||||
if (!res)
|
||||
return {};
|
||||
|
||||
MutableColumns columns(sample_block.cloneEmptyColumns());
|
||||
insertRowForKey(columns, filter->filter, value);
|
||||
processed = true;
|
||||
|
||||
return Chunk{std::move(columns), 1};
|
||||
}
|
||||
|
||||
bool insertRowForKey(MutableColumns & columns, const std::string & key, const std::string & value)
|
||||
{
|
||||
|
||||
for (size_t column_index = 0; column_index < column_names.size(); ++column_index)
|
||||
{
|
||||
if (column_names[column_index] == "key")
|
||||
assert_cast<ColumnString &>(*columns[column_index]).insertData(key.data(), key.size());
|
||||
else if (column_names[column_index] == "value")
|
||||
assert_cast<ColumnString &>(*columns[column_index]).insertData(value.data(), value.size());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public:
|
||||
StorageKeeperMapSource(const Block & sample_block_, StorageKeeperMap & storage_, size_t max_block_size_, std::optional<KeyFilter> filter_)
|
||||
: ISource(sample_block_)
|
||||
, storage(storage_)
|
||||
, sample_block(sample_block_.cloneEmpty())
|
||||
, column_names(sample_block_.getNames())
|
||||
, max_block_size(max_block_size_)
|
||||
, filter(std::move(filter_))
|
||||
{
|
||||
has_value_column = std::any_of(column_names.begin(), column_names.end(), [](const auto & name) { return name == "value"; });
|
||||
|
||||
// TODO(antonio2368): Do it lazily in generate
|
||||
if (!filter || filter->type != FilterType::EXACT)
|
||||
{
|
||||
auto zookeeper = storage.getClient();
|
||||
keys = zookeeper->getChildren(storage.rootKeeperPath());
|
||||
}
|
||||
}
|
||||
|
||||
std::string getName() const override { return "StorageKeeperMapSource"; }
|
||||
|
||||
Chunk generate() override
|
||||
{
|
||||
if (filter && filter->type == FilterType::EXACT)
|
||||
return generateSingleKey();
|
||||
|
||||
auto zookeeper = storage.getClient();
|
||||
|
||||
MutableColumns columns(sample_block.cloneEmptyColumns());
|
||||
size_t num_rows = 0;
|
||||
for (; num_rows < max_block_size && current_idx != keys.size(); ++current_idx)
|
||||
{
|
||||
const auto & key = keys[current_idx];
|
||||
std::string value;
|
||||
if (has_value_column)
|
||||
{
|
||||
auto path = fmt::format("{}/{}", storage.rootKeeperPath(), key);
|
||||
auto res = zookeeper->tryGet(path, value);
|
||||
if (!res)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (insertRowForKey(columns, key, value))
|
||||
++num_rows;
|
||||
}
|
||||
|
||||
if (num_rows == 0)
|
||||
return {};
|
||||
|
||||
return {std::move(columns), num_rows};
|
||||
}
|
||||
};
|
||||
|
||||
std::optional<KeyFilter> tryGetKeyFilter(const IAST & elem, const ContextPtr context)
|
||||
{
|
||||
const auto * function = elem.as<ASTFunction>();
|
||||
if (!function)
|
||||
return std::nullopt;
|
||||
|
||||
if (function->name != "equals")
|
||||
return std::nullopt;
|
||||
|
||||
const auto & args = function->arguments->as<ASTExpressionList &>();
|
||||
const ASTIdentifier * ident;
|
||||
ASTPtr value;
|
||||
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
|
||||
value = args.children.at(1);
|
||||
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
|
||||
value = args.children.at(0);
|
||||
else
|
||||
return std::nullopt;
|
||||
|
||||
if (ident->name() != "key")
|
||||
return std::nullopt;
|
||||
|
||||
auto evaluated = evaluateConstantExpressionAsLiteral(value, context);
|
||||
const auto * literal = evaluated->as<ASTLiteral>();
|
||||
if (!literal)
|
||||
return std::nullopt;
|
||||
|
||||
if (literal->value.getType() != Field::Types::String)
|
||||
return std::nullopt;
|
||||
|
||||
return KeyFilter{literal->value.safeGet<std::string>(), FilterType::EXACT};
|
||||
}
|
||||
|
||||
Pipe StorageKeeperMap::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & /*storage_snapshot*/,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
{
|
||||
Block sample_block;
|
||||
for (const std::string & column_name : column_names)
|
||||
{
|
||||
sample_block.insert({std::make_shared<DataTypeString>(), column_name});
|
||||
}
|
||||
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
std::optional<KeyFilter> key_filter;
|
||||
if (select.where())
|
||||
key_filter = tryGetKeyFilter(*select.where(), context);
|
||||
|
||||
return Pipe(std::make_shared<StorageKeeperMapSource>(sample_block, *this, max_block_size, std::move(key_filter)));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/)
|
||||
{
|
||||
auto columns = getNamesAndTypes();
|
||||
Block write_header;
|
||||
for (const auto & [name, type] : columns)
|
||||
{
|
||||
write_header.insert(ColumnWithTypeAndName(type, name));
|
||||
}
|
||||
|
||||
return std::make_shared<StorageKeeperMapSink>(write_header, *this);
|
||||
}
|
||||
|
||||
zkutil::ZooKeeperPtr & StorageKeeperMap::getClient()
|
||||
{
|
||||
if (zookeeper_client->expired())
|
||||
zookeeper_client = zookeeper_client->startNewSession();
|
||||
|
||||
return zookeeper_client;
|
||||
}
|
||||
|
||||
const std::string & StorageKeeperMap::rootKeeperPath() const
|
||||
{
|
||||
return keeper_path;
|
||||
}
|
||||
|
||||
void registerStorageKeeperMap(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
"KeeperMap",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
{
|
||||
if (!args.attach && !args.columns.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Storage KeeperMap does not accept column definition as it has predefined columns (key String, value String)");
|
||||
|
||||
ASTs & engine_args = args.engine_args;
|
||||
if (engine_args.empty() || engine_args.size() > 1)
|
||||
throw Exception(
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Storage KeeperMap requires 1 argument: "
|
||||
"keeper_path, path in the Keeper where the values will be stored");
|
||||
|
||||
auto keeper_path = checkAndGetLiteralArgument<String>(engine_args[0], "keeper_path");
|
||||
|
||||
return std::make_shared<StorageKeeperMap>(keeper_path, args.getContext(), args.table_id);
|
||||
},
|
||||
{});
|
||||
}
|
||||
|
||||
}
|
52
src/Storages/StorageKeeperMap.h
Normal file
52
src/Storages/StorageKeeperMap.h
Normal file
@ -0,0 +1,52 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Storages/IStorage.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
// KV store using (Zoo|CH)Keeper
|
||||
class StorageKeeperMap final : public IStorage
|
||||
{
|
||||
public:
|
||||
// TODO(antonio2368): add setting to control creating if keeper_path doesn't exist
|
||||
StorageKeeperMap(
|
||||
std::string_view keeper_path_,
|
||||
ContextPtr context,
|
||||
const StorageID & table_id
|
||||
);
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams) override;
|
||||
|
||||
SinkToStoragePtr write(
|
||||
const ASTPtr & query,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
ContextPtr context) override;
|
||||
|
||||
std::string getName() const override
|
||||
{
|
||||
return "KeeperMap";
|
||||
}
|
||||
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
|
||||
zkutil::ZooKeeperPtr & getClient();
|
||||
|
||||
const std::string & rootKeeperPath() const;
|
||||
private:
|
||||
|
||||
std::string keeper_path;
|
||||
zkutil::ZooKeeperPtr zookeeper_client;
|
||||
};
|
||||
|
||||
}
|
@ -88,6 +88,7 @@ void registerStorageFileLog(StorageFactory & factory);
|
||||
void registerStorageSQLite(StorageFactory & factory);
|
||||
#endif
|
||||
|
||||
void registerStorageKeeperMap(StorageFactory & factory);
|
||||
|
||||
void registerStorages()
|
||||
{
|
||||
@ -171,6 +172,8 @@ void registerStorages()
|
||||
#if USE_SQLITE
|
||||
registerStorageSQLite(factory);
|
||||
#endif
|
||||
|
||||
registerStorageKeeperMap(factory);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user