Add StorageEmbeddedRocksdb Engine

This commit is contained in:
root 2020-09-02 09:13:59 +00:00 committed by sundy-li
parent 8e8476639a
commit e9de5b6ad4
13 changed files with 450 additions and 0 deletions

4
.gitmodules vendored
View File

@ -193,3 +193,7 @@
[submodule "contrib/miniselect"]
path = contrib/miniselect
url = https://github.com/danlark1/miniselect
[submodule "contrib/rocksdb"]
path = contrib/rocksdb
url = https://github.com/facebook/rocksdb
branch = v6.11.4

View File

@ -456,6 +456,8 @@ include (cmake/find/simdjson.cmake)
include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/odbc.cmake)
include (cmake/find/rocksdb.cmake)
if(NOT USE_INTERNAL_PARQUET_LIBRARY)
set (ENABLE_ORC OFF CACHE INTERNAL "")

22
cmake/find/rocksdb.cmake Normal file
View File

@ -0,0 +1,22 @@
option(ENABLE_ROCKSDB "Enable ROCKSDB" ${ENABLE_LIBRARIES})
option(USE_INTERNAL_ROCKSDB_LIBRARY "Set to FALSE to use system ROCKSDB library instead of bundled" ${NOT_UNBUNDLED})
if(ENABLE_ROCKSDB)
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/rocksdb")
message (WARNING "submodule contrib is missing. to fix try run: \n git submodule update --init --recursive")
set (MISSING_ROCKSDB 1)
endif ()
if (USE_INTERNAL_ROCKSDB_LIBRARY AND NOT MISSING_ROCKSDB)
set (ROCKSDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/include")
set (ROCKSDB_CORE_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb/include")
set (ROCKSDB_LIBRARY "rocksdb")
set (USE_INTERNAL_ROCKSDB_LIBRARY 1)
set (USE_ROCKSDB 1)
else()
find_package(ROCKSDB)
endif ()
endif()
message (STATUS "Using ROCKSDB=${USE_ROCKSDB}: ${ROCKSDB_INCLUDE_DIR} : ${ROCKSDB_LIBRARY}")

View File

@ -320,3 +320,10 @@ if (USE_KRB5)
add_subdirectory (cyrus-sasl-cmake)
endif()
endif()
if (USE_INTERNAL_ROCKSDB_LIBRARY)
set(WITH_TESTS OFF)
set(WITH_BENCHMARK_TOOLS OFF)
set(WITH_TOOLS OFF)
add_subdirectory (rocksdb)
endif()

1
contrib/rocksdb vendored Submodule

@ -0,0 +1 @@
Subproject commit 963314ffd681596ef2738a95249fe4c1163ef87a

View File

@ -78,6 +78,10 @@ if (USE_AMQPCPP)
add_headers_and_sources(dbms Storages/RabbitMQ)
endif()
if (USE_ROCKSDB)
add_headers_and_sources(dbms Storages/Rocksdb)
endif()
if (USE_AWS_S3)
add_headers_and_sources(dbms Common/S3)
add_headers_and_sources(dbms Disks/S3)
@ -294,6 +298,7 @@ if (USE_KRB5)
dbms_target_link_libraries(PRIVATE ${KRB5_LIBRARY})
endif()
if(RE2_INCLUDE_DIR)
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
endif()
@ -402,6 +407,13 @@ if (USE_ORC)
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ORC_INCLUDE_DIR} ${CMAKE_BINARY_DIR}/contrib/orc/c++/include)
endif ()
if (USE_ROCKSDB)
dbms_target_link_libraries(PUBLIC ${ROCKSDB_LIBRARY})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ROCKSDB_INCLUDE_DIR})
# target_link_libraries (clickhouse_common_io PUBLIC ${ROCKSDB_LIBRARY})
# target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${ROCKSDB_INCLUDE_DIR})
endif()
if (ENABLE_TESTS AND USE_GTEST)
macro (grep_gtest_sources BASE_DIR DST_VAR)
# Cold match files that are not in tests/ directories

View File

@ -11,3 +11,5 @@
#cmakedefine01 USE_SSL
#cmakedefine01 USE_OPENCL
#cmakedefine01 USE_LDAP
#cmakedefine01 USE_ROCKSDB

View File

@ -0,0 +1,297 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Storages/StorageFactory.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeString.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/WriteBufferFromString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h>
#include <Interpreters/TreeRewriter.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnString.h>
#include <Processors/Pipe.h>
#include <Poco/File.h>
#include <Poco/Path.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int SYSTEM_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
static bool extractKeyImpl(const IAST & elem, Strings & res)
{
const auto * function = elem.as<ASTFunction>();
if (!function)
return false;
if (function->name == "equals" || function->name == "in")
{
const auto & args = function->arguments->as<ASTExpressionList &>();
const IAST * value;
if (args.children.size() != 2)
return false;
const ASTIdentifier * ident;
if ((ident = args.children.at(0)->as<ASTIdentifier>()))
value = args.children.at(1).get();
else if ((ident = args.children.at(1)->as<ASTIdentifier>()))
value = args.children.at(0).get();
else
return false;
if (ident->name != "key")
return false;
if (const auto * literal = value->as<ASTLiteral>())
{
if (literal->value.getType() == Field::Types::String)
{
res.push_back(literal->value.safeGet<String>());
return true;
}
else if (literal->value.getType() == Field::Types::Tuple)
{
auto tuple = literal->value.safeGet<Tuple>();
for (const auto & f : tuple)
{
res.push_back(f.safeGet<String>());
}
return true;
}
else return false;
}
}
return false;
}
/** Retrieve from the query a condition of the form `key = 'key'` or `key in ('xxx_') or `key like 'xxx%'`, from conjunctions in the WHERE clause.
*/
static std::pair<Strings, bool> extractKey(const ASTPtr & query)
{
const auto & select = query->as<ASTSelectQuery &>();
if (!select.where())
{
return std::make_pair(Strings{}, true);
}
Strings res;
extractKeyImpl(*select.where(), res);
return std::make_pair(res, false);
}
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
{
public:
explicit EmbeddedRocksdbBlockOutputStream(
StorageEmbeddedRocksdb & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
auto key_col = block.getByName("key");
auto val_col = block.getByName("value");
const ColumnString * keys = checkAndGetColumn<ColumnString>(key_col.column.get());
WriteBufferFromOwnString wb_value;
for (size_t i = 0; i < rows; i++)
{
StringRef key = keys->getDataAt(i);
val_col.type->serializeBinary(*val_col.column, i, wb_value);
auto status = storage.rocksdb_ptr->rocksdb->Put(rocksdb::WriteOptions(), key.toString(), wb_value.str());
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
wb_value.restart();
}
rocksdb::Iterator* it = storage.rocksdb_ptr->rocksdb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next())
{
LOG_DEBUG(&Poco::Logger::get("StorageEmbeddedRocksdb"), "Iterator `{}` returns `{}`, {}",
it->key().ToString(), it->value().ToString(), it->key().size());
}
// Check for any errors found during the scan
assert(it->status().ok());
delete it;
}
private:
StorageEmbeddedRocksdb & storage;
StorageMetadataPtr metadata_snapshot;
};
StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageFactory::Arguments & args)
: IStorage(args.table_id)
{
//must contains two columns, key and value
if (args.columns.size() != 2)
throw Exception("Storage " + getName() + " requires exactly 2 columns", ErrorCodes::BAD_ARGUMENTS);
if (!args.columns.has("key") || !args.columns.has("value"))
throw Exception("Storage " + getName() + " requires columns are: key and value", ErrorCodes::BAD_ARGUMENTS);
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(args.columns);
storage_metadata.setConstraints(args.constraints);
setInMemoryMetadata(storage_metadata);
rocksdb_dir = args.context.getPath() + args.relative_data_path + "/rocksdb";
if (!args.attach)
{
Poco::File(rocksdb_dir).createDirectories();
}
initDb();
}
void StorageEmbeddedRocksdb::truncate(const ASTPtr &, const StorageMetadataPtr & , const Context &, TableExclusiveLockHolder &)
{
if (rocksdb_ptr)
{
rocksdb_ptr->shutdown();
}
Poco::File(rocksdb_dir).remove(true);
Poco::File(rocksdb_dir).createDirectories();
initDb();
}
void StorageEmbeddedRocksdb::initDb()
{
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
rocksdb::DB * db;
options.create_if_missing = true;
options.statistics = rocksdb::CreateDBStatistics();
auto cache = rocksdb::NewLRUCache(256 << 20);
table_options.block_cache = cache;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
rocksdb::Status status = rocksdb::DB::Open(options, rocksdb_dir, &db);
if (status != rocksdb::Status::OK())
throw Exception("Fail to open rocksdb path at: " + rocksdb_dir, ErrorCodes::SYSTEM_ERROR);
rocksdb_ptr = std::make_shared<Rocksdb>(db);
}
Pipe StorageEmbeddedRocksdb::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Block sample_block = metadata_snapshot->getSampleBlock();
size_t key_pos = 0;
size_t value_pos = 1;
if (sample_block.getByPosition(0).name != "key")
std::swap(key_pos, value_pos);
MutableColumns columns = sample_block.cloneEmptyColumns();
Strings keys;
bool all_scan = false;
std::tie(keys, all_scan) = extractKey(query_info.query);
if (keys.empty() && !all_scan)
throw Exception("StorageEmbeddedRocksdb engine must contain condition like key = 'key' or key in tuple(String) or key like 'xxx%' in WHERE clause or empty WHERE clause for all key value scan.", ErrorCodes::BAD_ARGUMENTS);
// TODO pipline
if (all_scan)
{
auto it = rocksdb_ptr->rocksdb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next())
{
ReadBufferFromString rvalue(it->value().ToString());
columns[key_pos]->insert(it->key().ToString());
sample_block.getByName("value").type->deserializeBinary(*columns[value_pos], rvalue);
}
assert(it->status().ok());
delete it;
}
else
{
Strings values;
std::vector<rocksdb::Slice> slices_keys;
for (auto & key : keys)
{
slices_keys.push_back(key);
}
auto statuses = rocksdb_ptr->rocksdb->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
for (size_t i = 0; i < statuses.size(); ++i)
{
if (statuses[i].ok())
{
ReadBufferFromString rvalue(values[i]);
columns[key_pos]->insert(keys[i]);
sample_block.getByName("value").type->deserializeBinary(*columns[value_pos], rvalue);
}
}
}
UInt64 num_rows = columns.at(0)->size();
Chunk chunk(std::move(columns), num_rows);
return Pipe(std::make_shared<SourceFromSingleChunk>(sample_block, std::move(chunk)));
}
BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<EmbeddedRocksdbBlockOutputStream>(*this, metadata_snapshot);
}
StorageEmbeddedRocksdb::~StorageEmbeddedRocksdb()
{
if (rocksdb_ptr)
{
rocksdb_ptr->shutdown();
}
}
void registerStorageEmbeddedRocksdb(StorageFactory & factory)
{
factory.registerStorage("EmbeddedRocksdb", [](const StorageFactory::Arguments & args)
{
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageEmbeddedRocksdb::create(args);
});
}
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/TableLockHolder.h>
#include <Disks/IDisk.h>
#include <shared_mutex>
#include "rocksdb/db.h"
#include "rocksdb/table.h"
namespace DB
{
class Context;
struct Rocksdb
{
rocksdb::DB * rocksdb;
explicit Rocksdb(rocksdb::DB * rocksdb_) : rocksdb{rocksdb_} {}
Rocksdb(const Rocksdb &) = delete;
Rocksdb & operator=(const Rocksdb &) = delete;
void shutdown()
{
if (rocksdb)
{
rocksdb->Close();
delete rocksdb;
}
}
};
class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper<StorageEmbeddedRocksdb>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksdb>;
friend class EmbeddedRocksdbBlockOutputStream;
public:
std::string getName() const override { return "EmbeddedRocksdb"; }
~StorageEmbeddedRocksdb() override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
protected:
StorageEmbeddedRocksdb(const StorageFactory::Arguments & args);
private:
using RocksdbPtr = std::shared_ptr<Rocksdb>;
String rocksdb_dir;
RocksdbPtr rocksdb_ptr;
mutable std::shared_mutex rwlock;
void initDb();
};
}

View File

@ -53,6 +53,10 @@ void registerStorages()
#if USE_AMQPCPP
registerStorageRabbitMQ(factory);
#endif
#if USE_ROCKSDB
registerStorageEmbeddedRocksdb(factory);
#endif
}
}

View File

@ -54,6 +54,10 @@ void registerStorageKafka(StorageFactory & factory);
void registerStorageRabbitMQ(StorageFactory & factory);
#endif
#if USE_ROCKSDB
void registerStorageEmbeddedRocksdb(StorageFactory & factory);
#endif
void registerStorages();
}

View File

@ -0,0 +1,6 @@
1
1000
0 435761734006
1 435761734006
2 435761734006
0

View File

@ -0,0 +1,22 @@
DROP TABLE IF EXISTS test;
create table test (key String, value UInt32) Engine=EmbeddedRocksdb;
insert into test select '1_1', number from numbers(10000);
select count(1) from test;
insert into test select concat(toString(number), '_1'), number from numbers(10000);
select sum(value) from test where key in ('1_1', '99_1', '900_1');
DROP TABLE IF EXISTS test;
create table test (key String, value UInt64) Engine=EmbeddedRocksdb;
insert into test select toString(number%3) as key, sum(number) as value from numbers(1000000) group by key;
select key, sum(value) from test group by key order by key;
truncate table test;
select count(1) from test;
DROP TABLE IF EXISTS test;