mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Extract common KV storage logic
This commit is contained in:
parent
110470809b
commit
b33f3a4e16
182
src/Storages/KVStorageUtils.cpp
Normal file
182
src/Storages/KVStorageUtils.cpp
Normal file
@ -0,0 +1,182 @@
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
// returns keys may be filter by condition
|
||||
bool traverseASTFilter(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSetsPtr & prepared_sets, const ContextPtr & context, FieldVectorPtr & res)
|
||||
{
|
||||
const auto * function = elem->as<ASTFunction>();
|
||||
if (!function)
|
||||
return false;
|
||||
|
||||
if (function->name == "and")
|
||||
{
|
||||
// one child has the key filter condition is ok
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (traverseASTFilter(primary_key, primary_key_type, child, prepared_sets, context, res))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (function->name == "or")
|
||||
{
|
||||
// make sure every child has the key filter condition
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (!traverseASTFilter(primary_key, primary_key_type, child, prepared_sets, context, res))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else if (function->name == "equals" || function->name == "in")
|
||||
{
|
||||
const auto & args = function->arguments->as<ASTExpressionList &>();
|
||||
const ASTIdentifier * ident;
|
||||
std::shared_ptr<IAST> value;
|
||||
|
||||
if (args.children.size() != 2)
|
||||
return false;
|
||||
|
||||
if (function->name == "in")
|
||||
{
|
||||
if (!prepared_sets)
|
||||
return false;
|
||||
|
||||
ident = args.children.at(0)->as<ASTIdentifier>();
|
||||
if (!ident)
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
value = args.children.at(1);
|
||||
|
||||
PreparedSetKey set_key;
|
||||
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
|
||||
set_key = PreparedSetKey::forSubquery(*value);
|
||||
else
|
||||
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
|
||||
|
||||
SetPtr set = prepared_sets->get(set_key);
|
||||
if (!set)
|
||||
return false;
|
||||
|
||||
if (!set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
set->checkColumnsNumber(1);
|
||||
const auto & set_column = *set->getSetElements()[0];
|
||||
for (size_t row = 0; row < set_column.size(); ++row)
|
||||
res->push_back(set_column[row]);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
|
||||
value = args.children.at(1);
|
||||
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
|
||||
value = args.children.at(0);
|
||||
else
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
|
||||
const auto node = evaluateConstantExpressionAsLiteral(value, context);
|
||||
/// function->name == "equals"
|
||||
if (const auto * literal = node->as<ASTLiteral>())
|
||||
{
|
||||
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
|
||||
if (!converted_field.isNull())
|
||||
res->push_back(converted_field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context)
|
||||
{
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
if (!select.where())
|
||||
return {{}, true};
|
||||
|
||||
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.prepared_sets, context, res);
|
||||
return std::make_pair(res, !matched_keys);
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size)
|
||||
{
|
||||
size_t num_keys = end - it;
|
||||
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
size_t rows_processed = 0;
|
||||
while (it < end && (max_block_size == 0 || rows_processed < max_block_size))
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
key_column_type->getDefaultSerialization()->serializeBinary(*it, wb);
|
||||
wb.finalize();
|
||||
|
||||
++it;
|
||||
++rows_processed;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys)
|
||||
{
|
||||
if (!keys.column)
|
||||
return {};
|
||||
|
||||
size_t num_keys = keys.column->size();
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i)
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
Field field;
|
||||
keys.column->get(i, field);
|
||||
/// TODO(@vdimir): use serializeBinaryBulk
|
||||
keys.type->getDefaultSerialization()->serializeBinary(field, wb);
|
||||
wb.finalize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// In current implementation rocks db can have key with only one column.
|
||||
size_t getPrimaryKeyPos(const Block & header, const Names & primary_key)
|
||||
{
|
||||
if (primary_key.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only one primary key is supported");
|
||||
return header.getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
}
|
48
src/Storages/KVStorageUtils.h
Normal file
48
src/Storages/KVStorageUtils.h
Normal file
@ -0,0 +1,48 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Core/Field.h>
|
||||
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using FieldVectorPtr = std::shared_ptr<FieldVector>;
|
||||
|
||||
class IDataType;
|
||||
using DataTypePtr = std::shared_ptr<const IDataType>;
|
||||
|
||||
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
|
||||
* TODO support key like search
|
||||
*/
|
||||
std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const std::string & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context);
|
||||
|
||||
template <typename K, typename V>
|
||||
void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||
{
|
||||
ReadBufferFromString key_buffer(key);
|
||||
ReadBufferFromString value_buffer(value);
|
||||
for (size_t i = 0; i < header.columns(); ++i)
|
||||
{
|
||||
const auto & serialization = header.getByPosition(i).type->getDefaultSerialization();
|
||||
serialization->deserializeBinary(*columns[i], i == key_pos ? key_buffer : value_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size);
|
||||
|
||||
std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys);
|
||||
|
||||
/// In current implementation key with only column is supported.
|
||||
size_t getPrimaryKeyPos(const Block & header, const Names & primary_key);
|
||||
|
||||
}
|
@ -3,30 +3,17 @@
|
||||
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/KVStorageUtils.h>
|
||||
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTSubquery.h>
|
||||
#include <Parsers/ASTExpressionList.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Processors/ISource.h>
|
||||
|
||||
#include <Interpreters/castColumn.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Set.h>
|
||||
#include <Interpreters/PreparedSets.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
@ -75,179 +62,6 @@ static RocksDBOptions getOptionsFromConfig(const Poco::Util::AbstractConfigurati
|
||||
return options;
|
||||
}
|
||||
|
||||
// returns keys may be filter by condition
|
||||
static bool traverseASTFilter(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const ASTPtr & elem, const PreparedSetsPtr & prepared_sets, const ContextPtr & context, FieldVectorPtr & res)
|
||||
{
|
||||
const auto * function = elem->as<ASTFunction>();
|
||||
if (!function)
|
||||
return false;
|
||||
|
||||
if (function->name == "and")
|
||||
{
|
||||
// one child has the key filter condition is ok
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (traverseASTFilter(primary_key, primary_key_type, child, prepared_sets, context, res))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
else if (function->name == "or")
|
||||
{
|
||||
// make sure every child has the key filter condition
|
||||
for (const auto & child : function->arguments->children)
|
||||
if (!traverseASTFilter(primary_key, primary_key_type, child, prepared_sets, context, res))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
else if (function->name == "equals" || function->name == "in")
|
||||
{
|
||||
const auto & args = function->arguments->as<ASTExpressionList &>();
|
||||
const ASTIdentifier * ident;
|
||||
std::shared_ptr<IAST> value;
|
||||
|
||||
if (args.children.size() != 2)
|
||||
return false;
|
||||
|
||||
if (function->name == "in")
|
||||
{
|
||||
if (!prepared_sets)
|
||||
return false;
|
||||
|
||||
ident = args.children.at(0)->as<ASTIdentifier>();
|
||||
if (!ident)
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
value = args.children.at(1);
|
||||
|
||||
PreparedSetKey set_key;
|
||||
if ((value->as<ASTSubquery>() || value->as<ASTIdentifier>()))
|
||||
set_key = PreparedSetKey::forSubquery(*value);
|
||||
else
|
||||
set_key = PreparedSetKey::forLiteral(*value, {primary_key_type});
|
||||
|
||||
SetPtr set = prepared_sets->get(set_key);
|
||||
if (!set)
|
||||
return false;
|
||||
|
||||
if (!set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
set->checkColumnsNumber(1);
|
||||
const auto & set_column = *set->getSetElements()[0];
|
||||
for (size_t row = 0; row < set_column.size(); ++row)
|
||||
res->push_back(set_column[row]);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
|
||||
value = args.children.at(1);
|
||||
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
|
||||
value = args.children.at(0);
|
||||
else
|
||||
return false;
|
||||
|
||||
if (ident->name() != primary_key)
|
||||
return false;
|
||||
|
||||
const auto node = evaluateConstantExpressionAsLiteral(value, context);
|
||||
/// function->name == "equals"
|
||||
if (const auto * literal = node->as<ASTLiteral>())
|
||||
{
|
||||
auto converted_field = convertFieldToType(literal->value, *primary_key_type);
|
||||
if (!converted_field.isNull())
|
||||
res->push_back(converted_field);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
|
||||
* TODO support key like search
|
||||
*/
|
||||
static std::pair<FieldVectorPtr, bool> getFilterKeys(
|
||||
const String & primary_key, const DataTypePtr & primary_key_type, const SelectQueryInfo & query_info, const ContextPtr & context)
|
||||
{
|
||||
const auto & select = query_info.query->as<ASTSelectQuery &>();
|
||||
if (!select.where())
|
||||
return {{}, true};
|
||||
|
||||
FieldVectorPtr res = std::make_shared<FieldVector>();
|
||||
auto matched_keys = traverseASTFilter(primary_key, primary_key_type, select.where(), query_info.prepared_sets, context, res);
|
||||
return std::make_pair(res, !matched_keys);
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
static void fillColumns(const K & key, const V & value, size_t key_pos, const Block & header, MutableColumns & columns)
|
||||
{
|
||||
ReadBufferFromString key_buffer(key);
|
||||
ReadBufferFromString value_buffer(value);
|
||||
for (size_t i = 0; i < header.columns(); ++i)
|
||||
{
|
||||
const auto & serialization = header.getByPosition(i).type->getDefaultSerialization();
|
||||
serialization->deserializeBinary(*columns[i], i == key_pos ? key_buffer : value_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
static std::vector<std::string> serializeKeysToRawString(
|
||||
FieldVector::const_iterator & it,
|
||||
FieldVector::const_iterator end,
|
||||
DataTypePtr key_column_type,
|
||||
size_t max_block_size)
|
||||
{
|
||||
size_t num_keys = end - it;
|
||||
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
size_t rows_processed = 0;
|
||||
while (it < end && (max_block_size == 0 || rows_processed < max_block_size))
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
key_column_type->getDefaultSerialization()->serializeBinary(*it, wb);
|
||||
wb.finalize();
|
||||
|
||||
++it;
|
||||
++rows_processed;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static std::vector<std::string> serializeKeysToRawString(const ColumnWithTypeAndName & keys)
|
||||
{
|
||||
if (!keys.column)
|
||||
return {};
|
||||
|
||||
size_t num_keys = keys.column->size();
|
||||
std::vector<std::string> result;
|
||||
result.reserve(num_keys);
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i)
|
||||
{
|
||||
std::string & serialized_key = result.emplace_back();
|
||||
WriteBufferFromString wb(serialized_key);
|
||||
Field field;
|
||||
keys.column->get(i, field);
|
||||
/// TODO(@vdimir): use serializeBinaryBulk
|
||||
keys.type->getDefaultSerialization()->serializeBinary(field, wb);
|
||||
wb.finalize();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/// In current implementation rocks db can have key with only one column.
|
||||
static size_t getPrimaryKeyPos(const Block & header, const Names & primary_key)
|
||||
{
|
||||
if (primary_key.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB: only one primary key is supported");
|
||||
return header.getPositionByName(primary_key[0]);
|
||||
}
|
||||
|
||||
class EmbeddedRocksDBSource : public ISource
|
||||
{
|
||||
public:
|
||||
@ -597,7 +411,6 @@ Chunk StorageEmbeddedRocksDB::getBySerializedKeys(
|
||||
for (const auto & key : keys)
|
||||
slices_keys.emplace_back(key);
|
||||
|
||||
|
||||
auto statuses = multiGet(slices_keys, values);
|
||||
if (null_map)
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user