add primary key for StorageEmbeddedRocksdb

This commit is contained in:
sundy-li 2020-10-01 18:45:53 +08:00
parent 03ad32a3fa
commit b092ebe40d
4 changed files with 159 additions and 121 deletions

View File

@ -8,6 +8,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <DataTypes/NestedUtils.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
@ -21,9 +22,12 @@
#include <Columns/IColumn.h>
#include <Columns/ColumnString.h>
#include <Processors/Pipe.h>
#include <ext/enumerate.h>
#include <Poco/File.h>
#include <Poco/Path.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
namespace DB
{
@ -36,13 +40,21 @@ namespace ErrorCodes
}
static bool extractKeyImpl(const IAST & elem, Strings & res)
// returns keys may be filter by condition
static bool traverseASTFilter(const String & primary_key, const ASTPtr & elem, FieldVector & res)
{
const auto * function = elem.as<ASTFunction>();
const auto * function = elem->as<ASTFunction>();
if (!function)
return false;
if (function->name == "equals" || function->name == "in")
if (function->name == "and")
{
for (const auto & child : function->arguments->children)
if (traverseASTFilter(primary_key, child, res))
return true;
return false;
}
else if (function->name == "equals" || function->name == "in")
{
const auto & args = function->arguments->as<ASTExpressionList &>();
const IAST * value;
@ -58,22 +70,22 @@ static bool extractKeyImpl(const IAST & elem, Strings & res)
else
return false;
if (ident->name != "key")
if (ident->name != primary_key)
return false;
if (const auto * literal = value->as<ASTLiteral>())
{
if (literal->value.getType() == Field::Types::String)
if (function->name == "equals")
{
res.push_back(literal->value.safeGet<String>());
res.push_back(literal->value);
return true;
}
else if (literal->value.getType() == Field::Types::Tuple)
else if (function->name == "in" && literal->value.getType() == Field::Types::Tuple)
{
auto tuple = literal->value.safeGet<Tuple>();
for (const auto & f : tuple)
{
res.push_back(f.safeGet<String>());
res.push_back(f);
}
return true;
}
@ -84,20 +96,22 @@ static bool extractKeyImpl(const IAST & elem, Strings & res)
}
/** Retrieve from the query a condition of the form `key = 'key'` or `key in ('xxx_'), from conjunctions in the WHERE clause.
/** Retrieve from the query a condition of the form `key = 'key'`, `key in ('xxx_'), from conjunctions in the WHERE clause.
* TODO support key like search
*/
static std::pair<Strings, bool> extractKey(const ASTPtr & query)
static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, const ASTPtr & query)
{
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where())
{
return std::make_pair(Strings{}, true);
return std::make_pair(FieldVector{}, true);
}
Strings res;
extractKeyImpl(*select.where(), res);
return std::make_pair(res, false);
FieldVector res;
auto matched_keys = traverseASTFilter(primary_key, select.where(), res);
return std::make_pair(res, !matched_keys);
}
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
{
public:
@ -114,24 +128,32 @@ public:
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
auto key_col = block.getByName("key");
auto val_col = block.getByName("value");
const ColumnString * keys = checkAndGetColumn<ColumnString>(key_col.column.get());
WriteBufferFromOwnString wb_key;
WriteBufferFromOwnString wb_value;
rocksdb::WriteBatch batch;
auto columns = metadata_snapshot->getColumns();
for (size_t i = 0; i < rows; i++)
{
StringRef key = keys->getDataAt(i);
val_col.type->serializeBinary(*val_col.column, i, wb_value);
auto status = storage.rocksdb_ptr->rocksdb->Put(rocksdb::WriteOptions(), key.toString(), wb_value.str());
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
wb_key.restart();
wb_value.restart();
for (const auto & col : columns)
{
const auto & type = block.getByName(col.name).type;
const auto & column = block.getByName(col.name).column;
if (col.name == storage.primary_key)
type->serializeBinary(*column, i, wb_key);
else
type->serializeBinary(*column, i, wb_value);
}
batch.Put(wb_key.str(), wb_value.str());
}
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
}
private:
@ -139,23 +161,17 @@ private:
StorageMetadataPtr metadata_snapshot;
};
StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageFactory::Arguments & args)
: IStorage(args.table_id)
StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
bool attach,
Context & context_,
const String & primary_key_)
: IStorage(table_id_), primary_key{primary_key_}
{
//must contains two columns, key and value
if (args.columns.size() != 2)
throw Exception("Storage " + getName() + " requires exactly 2 columns", ErrorCodes::BAD_ARGUMENTS);
if (!args.columns.has("key") || !args.columns.has("value"))
throw Exception("Storage " + getName() + " requires columns are: key and value", ErrorCodes::BAD_ARGUMENTS);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(args.columns);
storage_metadata.setConstraints(args.constraints);
setInMemoryMetadata(storage_metadata);
rocksdb_dir = args.context.getPath() + args.relative_data_path + "/rocksdb";
if (!args.attach)
setInMemoryMetadata(metadata_);
rocksdb_dir = context_.getPath() + relative_data_path_ + "/rocksdb";
if (!attach)
{
Poco::File(rocksdb_dir).createDirectories();
}
@ -164,10 +180,7 @@ StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageFactory::Arguments &
void StorageEmbeddedRocksdb::truncate(const ASTPtr &, const StorageMetadataPtr & , const Context &, TableExclusiveLockHolder &)
{
if (rocksdb_ptr)
{
rocksdb_ptr->shutdown();
}
rocksdb_ptr->Close();
Poco::File(rocksdb_dir).remove(true);
Poco::File(rocksdb_dir).createDirectories();
initDb();
@ -176,20 +189,13 @@ void StorageEmbeddedRocksdb::truncate(const ASTPtr &, const StorageMetadataPtr &
void StorageEmbeddedRocksdb::initDb()
{
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
rocksdb::DB * db;
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
auto cache = rocksdb::NewLRUCache(256 << 20);
table_options.block_cache = cache;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
rocksdb::Status status = rocksdb::DB::Open(options, rocksdb_dir, &db);
if (status != rocksdb::Status::OK())
throw Exception("Fail to open rocksdb path at: " + rocksdb_dir, ErrorCodes::SYSTEM_ERROR);
rocksdb_ptr = std::make_shared<Rocksdb>(db);
rocksdb_ptr = std::unique_ptr<rocksdb::DB>(db);
}
@ -204,49 +210,61 @@ Pipe StorageEmbeddedRocksdb::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Block sample_block = metadata_snapshot->getSampleBlock();
size_t key_pos = 0;
size_t value_pos = 1;
if (sample_block.getByPosition(0).name != "key")
std::swap(key_pos, value_pos);
MutableColumns columns = sample_block.cloneEmptyColumns();
Strings keys;
FieldVector keys;
bool all_scan = false;
std::tie(keys, all_scan) = extractKey(query_info.query);
if (keys.empty() && !all_scan)
throw Exception("StorageEmbeddedRocksdb engine must contain condition like key = 'key' or key in tuple(String) or key like 'xxx%' in WHERE clause or empty WHERE clause for all key value scan.", ErrorCodes::BAD_ARGUMENTS);
std::tie(keys, all_scan) = getFilterKeys(primary_key, query_info.query);
// TODO pipline
if (all_scan)
{
auto it = rocksdb_ptr->rocksdb->NewIterator(rocksdb::ReadOptions());
auto it = std::unique_ptr<rocksdb::Iterator>(rocksdb_ptr->NewIterator(rocksdb::ReadOptions()));
for (it->SeekToFirst(); it->Valid(); it->Next())
{
ReadBufferFromString rvalue(it->value().ToString());
columns[key_pos]->insert(it->key().ToString());
sample_block.getByName("value").type->deserializeBinary(*columns[value_pos], rvalue);
ReadBufferFromString key_buffer(it->key());
ReadBufferFromString value_buffer(it->value());
for (const auto & item : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
if (item.second.name == primary_key)
item.second.type->deserializeBinary(*columns[item.first], key_buffer);
else
item.second.type->deserializeBinary(*columns[item.first], value_buffer);
}
}
assert(it->status().ok());
delete it;
}
else
{
Strings values;
std::vector<rocksdb::Slice> slices_keys;
for (auto & key : keys)
std::vector<String> values;
WriteBufferFromOwnString wb;
UInt64 offset = 0;
for (const auto & key : keys)
{
slices_keys.push_back(key);
sample_block.getByName(primary_key).type->serializeBinary(key, wb);
auto str_ref = wb.stringRef();
slices_keys.emplace_back(str_ref.data + offset, str_ref.size - offset);
offset = str_ref.size;
}
auto statuses = rocksdb_ptr->rocksdb->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
auto statuses = rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
{
ReadBufferFromString rvalue(values[i]);
columns[key_pos]->insert(keys[i]);
sample_block.getByName("value").type->deserializeBinary(*columns[value_pos], rvalue);
ReadBufferFromString key_buffer(slices_keys[i]);
ReadBufferFromString value_buffer(values[i]);
for (const auto & item : ext::enumerate(sample_block.getColumnsWithTypeAndName()))
{
if (item.second.name == primary_key)
item.second.type->deserializeBinary(*columns[item.first], key_buffer);
else
item.second.type->deserializeBinary(*columns[item.first], value_buffer);
}
}
}
}
@ -263,23 +281,41 @@ BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, con
StorageEmbeddedRocksdb::~StorageEmbeddedRocksdb()
{
if (rocksdb_ptr)
}
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO RocksdbSettings
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
StorageInMemoryMetadata metadata;
metadata.setColumns(args.columns);
metadata.setConstraints(args.constraints);
if (!args.storage_def->primary_key)
throw Exception("StorageEmbeddedRocksdb must require one primary key", ErrorCodes::BAD_ARGUMENTS);
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.context);
auto primary_key_names = metadata.getColumnsRequiredForPrimaryKey();
if (primary_key_names.size() != 1)
{
rocksdb_ptr->shutdown();
throw Exception("StorageEmbeddedRocksdb must require one primary key", ErrorCodes::BAD_ARGUMENTS);
}
return StorageEmbeddedRocksdb::create(args.table_id, args.relative_data_path, metadata, args.attach, args.context, primary_key_names[0]);
}
void registerStorageEmbeddedRocksdb(StorageFactory & factory)
{
factory.registerStorage("EmbeddedRocksdb", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageEmbeddedRocksdb::create(args);
});
StorageFactory::StorageFeatures features{
.supports_sort_order = true,
};
factory.registerStorage("EmbeddedRocksdb", create, features);
}

View File

@ -15,24 +15,6 @@ namespace DB
class Context;
struct Rocksdb
{
rocksdb::DB * rocksdb;
explicit Rocksdb(rocksdb::DB * rocksdb_) : rocksdb{rocksdb_} {}
Rocksdb(const Rocksdb &) = delete;
Rocksdb & operator=(const Rocksdb &) = delete;
void shutdown()
{
if (rocksdb)
{
rocksdb->Close();
delete rocksdb;
}
}
};
class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper<StorageEmbeddedRocksdb>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksdb>;
@ -55,12 +37,18 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
protected:
StorageEmbeddedRocksdb(const StorageFactory::Arguments & args);
StorageEmbeddedRocksdb(const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata,
bool attach,
Context & context_,
const String & primary_key_);
private:
using RocksdbPtr = std::shared_ptr<Rocksdb>;
String rocksdb_dir;
const String primary_key;
using RocksdbPtr = std::unique_ptr<rocksdb::DB>;
RocksdbPtr rocksdb_ptr;
String rocksdb_dir;
mutable std::shared_mutex rwlock;
void initDb();

View File

@ -1,6 +1,7 @@
1
1000
0 435761734006
1 435761734006
2 435761734006
0
1
0 0 0 0 0
1
1
1
1

View File

@ -1,22 +1,35 @@
DROP TABLE IF EXISTS test;
create table test (key String, value UInt32) Engine=EmbeddedRocksdb;
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksdb primary key(key);
insert into test select '1_1', number from numbers(10000);
select count(1) from test;
INSERT INTO test SELECT '1_1', number FROM numbers(10000);
SELECT count(1) == 1 FROM test;
insert into test select concat(toString(number), '_1'), number from numbers(10000);
select sum(value) from test where key in ('1_1', '99_1', '900_1');
INSERT INTO test SELECT concat(toString(number), '_1'), number FROM numbers(10000);
SELECT SUM(value) == 1 + 99 + 900 FROM test WHERE key in ('1_1', '99_1', '900_1');
DROP TABLE IF EXISTS test;
create table test (key String, value UInt64) Engine=EmbeddedRocksdb;
DROP TABLE IF EXISTS test_memory;
insert into test select toString(number%3) as key, sum(number) as value from numbers(1000000) group by key;
CREATE TABLE test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=EmbeddedRocksdb primary key(k);
CREATE TABLE test_memory AS test Engine = Memory;
select key, sum(value) from test group by key order by key;
INSERT INTO test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k;
truncate table test;
select count(1) from test;
INSERT INTO test_memory SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k;
SELECT A.a - B.a, A.b - B.b, A.c - B.c, A.d - B.d, A.e - B.e FROM ( SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test) A ANY LEFT JOIN (SELECT 0 AS a, groupBitmapMerge(bm) AS b , SUM(k) AS c, SUM(value) AS d, SUM(dummy.1) AS e FROM test_memory) B USING a ORDER BY a;
SET max_rows_to_read = 2;
SELECT dummy == (1,1.2) FROM test WHERE k IN (1, 3);
SELECT k == 4 FROM test WHERE k = 4;
SELECT k, value FROM test WHERE k = 0 OR value > 0; -- { serverError 158 }
TRUNCATE TABLE test;
SELECT 0 == count(1) FROM test;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_memory;