Rename Rocksdb to RocksDB

This commit is contained in:
Alexey Milovidov 2020-11-08 18:41:16 +03:00
parent 55a622ba2e
commit c89a363980
11 changed files with 62 additions and 62 deletions

View File

@ -1,15 +1,15 @@
---
toc_priority: 6
toc_title: EmbeddedRocksdb
toc_title: EmbeddedRocksDB
---
# EmbeddedRocksdb Engine {#EmbeddedRocksdb-engine}
# EmbeddedRocksDB Engine {#EmbeddedRocksDB-engine}
This engine allows integrating ClickHouse with [rocksdb](http://rocksdb.org/).
`EmbeddedRocksdb` lets you:
`EmbeddedRocksDB` lets you:
## Creating a Table {#table_engine-EmbeddedRocksdb-creating-a-table}
## Creating a Table {#table_engine-EmbeddedRocksDB-creating-a-table}
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
@ -17,7 +17,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = EmbeddedRocksdb PRIMARY KEY(primary_key_name)
) ENGINE = EmbeddedRocksDB PRIMARY KEY(primary_key_name)
```
Required parameters:
@ -34,7 +34,7 @@ CREATE TABLE test
`v2` String,
`v3` Float32,
)
ENGINE = EmbeddedRocksdb
ENGINE = EmbeddedRocksDB
PRIMARY KEY key
```

View File

@ -79,7 +79,7 @@ if (USE_AMQPCPP)
endif()
if (USE_ROCKSDB)
add_headers_and_sources(dbms Storages/Rocksdb)
add_headers_and_sources(dbms Storages/RocksDB)
endif()
if (USE_AWS_S3)

View File

@ -1,7 +1,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <ext/enumerate.h>
@ -14,8 +14,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
EmbeddedRocksdbBlockInputStream::EmbeddedRocksdbBlockInputStream(
StorageEmbeddedRocksdb & storage_,
EmbeddedRocksDBBlockInputStream::EmbeddedRocksDBBlockInputStream(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t max_block_size_)
: storage(storage_)
@ -26,7 +26,7 @@ EmbeddedRocksdbBlockInputStream::EmbeddedRocksdbBlockInputStream(
primary_key_pos = sample_block.getPositionByName(storage.primary_key);
}
Block EmbeddedRocksdbBlockInputStream::readImpl()
Block EmbeddedRocksDBBlockInputStream::readImpl()
{
if (finished)
return {};

View File

@ -1,26 +1,26 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <rocksdb/db.h>
namespace DB
{
class EmbeddedRocksdbBlockInputStream : public IBlockInputStream
class EmbeddedRocksDBBlockInputStream : public IBlockInputStream
{
public:
EmbeddedRocksdbBlockInputStream(
StorageEmbeddedRocksdb & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_);
EmbeddedRocksDBBlockInputStream(
StorageEmbeddedRocksDB & storage_, const StorageMetadataPtr & metadata_snapshot_, size_t max_block_size_);
String getName() const override { return storage.getName(); }
Block getHeader() const override { return sample_block; }
Block readImpl() override;
private:
StorageEmbeddedRocksdb & storage;
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
const size_t max_block_size;

View File

@ -1,5 +1,5 @@
#include <Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h>
#include <IO/WriteBufferFromString.h>
namespace DB
@ -10,12 +10,12 @@ namespace ErrorCodes
extern const int SYSTEM_ERROR;
}
Block EmbeddedRocksdbBlockOutputStream::getHeader() const
Block EmbeddedRocksDBBlockOutputStream::getHeader() const
{
return metadata_snapshot->getSampleBlock();
}
void EmbeddedRocksdbBlockOutputStream::write(const Block & block)
void EmbeddedRocksDBBlockOutputStream::write(const Block & block)
{
metadata_snapshot->check(block, true);
auto rows = block.rows();
@ -44,7 +44,7 @@ void EmbeddedRocksdbBlockOutputStream::write(const Block & block)
}
auto status = storage.rocksdb_ptr->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok())
throw Exception("Rocksdb write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
throw Exception("RocksDB write error: " + status.ToString(), ErrorCodes::SYSTEM_ERROR);
}
}

View File

@ -1,17 +1,17 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
class EmbeddedRocksdbBlockOutputStream : public IBlockOutputStream
class EmbeddedRocksDBBlockOutputStream : public IBlockOutputStream
{
public:
explicit EmbeddedRocksdbBlockOutputStream(
StorageEmbeddedRocksdb & storage_,
explicit EmbeddedRocksDBBlockOutputStream(
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
@ -21,7 +21,7 @@ public:
void write(const Block & block) override;
private:
StorageEmbeddedRocksdb & storage;
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
};

View File

@ -2,9 +2,9 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Storages/StorageFactory.h>
#include <Storages/Rocksdb/StorageEmbeddedRocksdb.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockOutputStream.h>
#include <Storages/Rocksdb/EmbeddedRocksdbBlockInputStream.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockOutputStream.h>
#include <Storages/RocksDB/EmbeddedRocksDBBlockInputStream.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
@ -153,11 +153,11 @@ static std::pair<FieldVector, bool> getFilterKeys(const String & primary_key, co
}
class EmbeddedRocksdbSource : public SourceWithProgress
class EmbeddedRocksDBSource : public SourceWithProgress
{
public:
EmbeddedRocksdbSource(
const StorageEmbeddedRocksdb & storage_,
EmbeddedRocksDBSource(
const StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const FieldVector & keys_,
const size_t start_,
@ -226,7 +226,7 @@ public:
}
private:
const StorageEmbeddedRocksdb & storage;
const StorageEmbeddedRocksDB & storage;
const StorageMetadataPtr metadata_snapshot;
const size_t start;
@ -238,7 +238,7 @@ private:
};
StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageID & table_id_,
StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
bool attach,
@ -255,7 +255,7 @@ StorageEmbeddedRocksdb::StorageEmbeddedRocksdb(const StorageID & table_id_,
initDb();
}
void StorageEmbeddedRocksdb::truncate(const ASTPtr &, const StorageMetadataPtr & , const Context &, TableExclusiveLockHolder &)
void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , const Context &, TableExclusiveLockHolder &)
{
rocksdb_ptr->Close();
Poco::File(rocksdb_dir).remove(true);
@ -263,7 +263,7 @@ void StorageEmbeddedRocksdb::truncate(const ASTPtr &, const StorageMetadataPtr &
initDb();
}
void StorageEmbeddedRocksdb::initDb()
void StorageEmbeddedRocksDB::initDb()
{
rocksdb::Options options;
rocksdb::DB * db;
@ -276,10 +276,10 @@ void StorageEmbeddedRocksdb::initDb()
}
Pipe StorageEmbeddedRocksdb::read(
Pipe StorageEmbeddedRocksDB::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
SelectQueryInfo & query_info,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
@ -293,7 +293,7 @@ Pipe StorageEmbeddedRocksdb::read(
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info);
if (all_scan)
{
auto reader = std::make_shared<EmbeddedRocksdbBlockInputStream>(
auto reader = std::make_shared<EmbeddedRocksDBBlockInputStream>(
*this, metadata_snapshot, max_block_size);
return Pipe(std::make_shared<SourceFromInputStream>(reader));
}
@ -322,22 +322,22 @@ Pipe StorageEmbeddedRocksdb::read(
end = start + batch_per_size > keys.size() ? keys.size() : start + batch_per_size;
pipes.emplace_back(
std::make_shared<EmbeddedRocksdbSource>(*this, metadata_snapshot, keys, start, end, max_block_size));
std::make_shared<EmbeddedRocksDBSource>(*this, metadata_snapshot, keys, start, end, max_block_size));
start += batch_per_size;
}
return Pipe::unitePipes(std::move(pipes));
}
}
BlockOutputStreamPtr StorageEmbeddedRocksdb::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
BlockOutputStreamPtr StorageEmbeddedRocksDB::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<EmbeddedRocksdbBlockOutputStream>(*this, metadata_snapshot);
return std::make_shared<EmbeddedRocksDBBlockOutputStream>(*this, metadata_snapshot);
}
static StoragePtr create(const StorageFactory::Arguments & args)
{
// TODO custom RocksdbSettings, table function
// TODO custom RocksDBSettings, table function
if (!args.engine_args.empty())
throw Exception(
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
@ -348,25 +348,25 @@ static StoragePtr create(const StorageFactory::Arguments & args)
metadata.setConstraints(args.constraints);
if (!args.storage_def->primary_key)
throw Exception("StorageEmbeddedRocksdb must require one primary key", ErrorCodes::BAD_ARGUMENTS);
throw Exception("StorageEmbeddedRocksDB must require one primary key", ErrorCodes::BAD_ARGUMENTS);
metadata.primary_key = KeyDescription::getKeyFromAST(args.storage_def->primary_key->ptr(), metadata.columns, args.context);
auto primary_key_names = metadata.getColumnsRequiredForPrimaryKey();
if (primary_key_names.size() != 1)
{
throw Exception("StorageEmbeddedRocksdb must require one primary key", ErrorCodes::BAD_ARGUMENTS);
throw Exception("StorageEmbeddedRocksDB must require one primary key", ErrorCodes::BAD_ARGUMENTS);
}
return StorageEmbeddedRocksdb::create(args.table_id, args.relative_data_path, metadata, args.attach, args.context, primary_key_names[0]);
return StorageEmbeddedRocksDB::create(args.table_id, args.relative_data_path, metadata, args.attach, args.context, primary_key_names[0]);
}
void registerStorageEmbeddedRocksdb(StorageFactory & factory)
void registerStorageEmbeddedRocksDB(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_sort_order = true,
};
factory.registerStorage("EmbeddedRocksdb", create, features);
factory.registerStorage("EmbeddedRocksDB", create, features);
}

View File

@ -15,19 +15,19 @@ namespace DB
class Context;
class StorageEmbeddedRocksdb final : public ext::shared_ptr_helper<StorageEmbeddedRocksdb>, public IStorage
class StorageEmbeddedRocksDB final : public ext::shared_ptr_helper<StorageEmbeddedRocksDB>, public IStorage
{
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksdb>;
friend class EmbeddedRocksdbSource;
friend class EmbeddedRocksdbBlockOutputStream;
friend class EmbeddedRocksdbBlockInputStream;
friend struct ext::shared_ptr_helper<StorageEmbeddedRocksDB>;
friend class EmbeddedRocksDBSource;
friend class EmbeddedRocksDBBlockOutputStream;
friend class EmbeddedRocksDBBlockInputStream;
public:
std::string getName() const override { return "EmbeddedRocksdb"; }
std::string getName() const override { return "EmbeddedRocksDB"; }
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
@ -44,7 +44,7 @@ public:
}
protected:
StorageEmbeddedRocksdb(const StorageID & table_id_,
StorageEmbeddedRocksDB(const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata,
bool attach,
@ -53,8 +53,8 @@ protected:
private:
const String primary_key;
using RocksdbPtr = std::unique_ptr<rocksdb::DB>;
RocksdbPtr rocksdb_ptr;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr;
String rocksdb_dir;
void initDb();

View File

@ -55,7 +55,7 @@ void registerStorages()
#endif
#if USE_ROCKSDB
registerStorageEmbeddedRocksdb(factory);
registerStorageEmbeddedRocksDB(factory);
#endif
}

View File

@ -55,7 +55,7 @@ void registerStorageRabbitMQ(StorageFactory & factory);
#endif
#if USE_ROCKSDB
void registerStorageEmbeddedRocksdb(StorageFactory & factory);
void registerStorageEmbeddedRocksDB(StorageFactory & factory);
#endif
void registerStorages();

View File

@ -1,5 +1,5 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksdb primary key(key);
CREATE TABLE test (key String, value UInt32) Engine=EmbeddedRocksDB primary key(key);
INSERT INTO test SELECT '1_1', number FROM numbers(10000);
SELECT count(1) == 1 FROM test;
@ -11,7 +11,7 @@ SELECT SUM(value) == 1 + 99 + 900 FROM test WHERE key in ('1_1', '99_1', '900_1'
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_memory;
CREATE TABLE test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=EmbeddedRocksdb primary key(k);
CREATE TABLE test (k UInt32, value UInt64, dummy Tuple(UInt32, Float64), bm AggregateFunction(groupBitmap, UInt64)) Engine=EmbeddedRocksDB primary key(k);
CREATE TABLE test_memory AS test Engine = Memory;
INSERT INTO test SELECT number % 77 AS k, SUM(number) AS value, (1, 1.2), bitmapBuild(groupArray(number)) FROM numbers(10000000) group by k;