Revert "Introduce bulk loading to StorageEmbeddedRocksDB"

This commit is contained in:
Alexey Milovidov 2024-05-03 05:27:03 +03:00 committed by GitHub
parent 8408758705
commit c9fd1df4fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 20 additions and 536 deletions

View File

@ -17,7 +17,6 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = EmbeddedRocksDB([ttl, rocksdb_dir, read_only]) PRIMARY KEY(primary_key_name)
[ SETTINGS name=value, ... ]
```
Engine parameters:
@ -30,11 +29,6 @@ Engine parameters:
- columns other than the primary key will be serialized in binary as `rocksdb` value in corresponding order.
- queries with key `equals` or `in` filtering will be optimized to multi keys lookup from `rocksdb`.
Engine settings:
- `optimize_for_bulk_insert` Table is optimized for bulk insertions (insert pipeline will create SST files and import to rocksdb database instead of writing to memtables); default value: `1`.
- `bulk_insert_block_size` - Minimum size of SST files (in term of rows) created by bulk insertion; default value: `1048449`.
Example:
``` sql

View File

@ -3,7 +3,6 @@
#include <Core/Block.h>
#include <Core/Names.h>
#include <Processors/Chunk.h>
#include <Core/ColumnsWithTypeAndName.h>
namespace DB
{

View File

@ -1,240 +0,0 @@
#include <atomic>
#include <cstddef>
#include <cstdlib>
#include <filesystem>
#include <memory>
#include <optional>
#include <random>
#include <stdatomic.h>
#include <IO/WriteBufferFromString.h>
#include <Storages/RocksDB/EmbeddedRocksDBBulkSink.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Columns/ColumnString.h>
#include <Core/SortDescription.h>
#include <DataTypes/DataTypeString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
#include <rocksdb/utilities/db_ttl.h>
#include <Common/SipHash.h>
#include <Common/getRandomASCIIString.h>
#include <Common/CurrentThread.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ROCKSDB_ERROR;
}
static const IColumn::Permutation & getAscendingPermutation(const IColumn & column, IColumn::Permutation & perm)
{
column.getPermutation(IColumn::PermutationSortDirection::Ascending, IColumn::PermutationSortStability::Stable, 0, 1, perm);
return perm;
}
/// Build SST file from key-value pairs
static rocksdb::Status buildSSTFile(const String & path, const ColumnString & keys, const ColumnString & values, const std::optional<IColumn::Permutation> & perm_ = {})
{
/// rocksdb::SstFileWriter requires keys to be sorted in ascending order
IColumn::Permutation calculated_perm;
const IColumn::Permutation & perm = perm_ ? *perm_ : getAscendingPermutation(keys, calculated_perm);
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions{}, rocksdb::Options{});
auto status = sst_file_writer.Open(path);
if (!status.ok())
return status;
auto rows = perm.size();
for (size_t idx = 0; idx < rows;)
{
/// We will write the last row of the same key
size_t next_idx = idx + 1;
while (next_idx < rows && keys.compareAt(perm[idx], perm[next_idx], keys, 1) == 0)
++next_idx;
auto row = perm[next_idx - 1];
status = sst_file_writer.Put(keys.getDataAt(row).toView(), values.getDataAt(row).toView());
if (!status.ok())
return status;
idx = next_idx;
}
return sst_file_writer.Finish();
}
EmbeddedRocksDBBulkSink::EmbeddedRocksDBBulkSink(
ContextPtr context_, StorageEmbeddedRocksDB & storage_, const StorageMetadataPtr & metadata_snapshot_)
: SinkToStorage(metadata_snapshot_->getSampleBlock()), WithContext(context_), storage(storage_), metadata_snapshot(metadata_snapshot_)
{
for (const auto & elem : getHeader())
{
if (elem.name == storage.primary_key)
break;
++primary_key_pos;
}
serializations = getHeader().getSerializations();
min_block_size_rows = std::max(storage.getSettings().bulk_insert_block_size, getContext()->getSettingsRef().min_insert_block_size_rows);
/// If max_insert_threads > 1 we may have multiple EmbeddedRocksDBBulkSink and getContext()->getCurrentQueryId() is not guarantee to
/// to have a distinct path. Also we cannot use query id as directory name here, because it could be defined by user and not suitable
/// for directory name
auto base_directory_name = TMP_INSERT_PREFIX + sipHash128String(getContext()->getCurrentQueryId());
insert_directory_queue = fs::path(storage.getDataPaths()[0]) / (base_directory_name + "-" + getRandomASCIIString(8));
fs::create_directory(insert_directory_queue);
}
EmbeddedRocksDBBulkSink::~EmbeddedRocksDBBulkSink()
{
try
{
if (fs::exists(insert_directory_queue))
fs::remove_all(insert_directory_queue);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Error while removing temporary directory {}:", insert_directory_queue));
}
}
std::vector<Chunk> EmbeddedRocksDBBulkSink::squash(Chunk chunk)
{
/// End of input stream
if (chunk.getNumRows() == 0)
{
return std::move(chunks);
}
/// Just read block is already enough.
if (isEnoughSize(chunk))
{
/// If no accumulated data, return just read block.
if (chunks.empty())
{
chunks.emplace_back(std::move(chunk));
return {};
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
std::vector<Chunk> to_return;
std::swap(to_return, chunks);
chunks.emplace_back(std::move(chunk));
return to_return;
}
/// Accumulated block is already enough.
if (isEnoughSize(chunks))
{
/// Return accumulated data and place new block to accumulated data.
std::vector<Chunk> to_return;
std::swap(to_return, chunks);
chunks.emplace_back(std::move(chunk));
return to_return;
}
chunks.emplace_back(std::move(chunk));
if (isEnoughSize(chunks))
{
std::vector<Chunk> to_return;
std::swap(to_return, chunks);
return to_return;
}
/// Squashed block is not ready.
return {};
}
std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::serializeChunks(const std::vector<Chunk> & input_chunks) const
{
auto serialized_key_column = ColumnString::create();
auto serialized_value_column = ColumnString::create();
{
auto & serialized_key_data = serialized_key_column->getChars();
auto & serialized_key_offsets = serialized_key_column->getOffsets();
auto & serialized_value_data = serialized_value_column->getChars();
auto & serialized_value_offsets = serialized_value_column->getOffsets();
WriteBufferFromVector<ColumnString::Chars> writer_key(serialized_key_data);
WriteBufferFromVector<ColumnString::Chars> writer_value(serialized_value_data);
for (const auto & chunk : input_chunks)
{
const auto & columns = chunk.getColumns();
auto rows = chunk.getNumRows();
for (size_t i = 0; i < rows; ++i)
{
for (size_t idx = 0; idx < columns.size(); ++idx)
serializations[idx]->serializeBinary(*columns[idx], i, idx == primary_key_pos ? writer_key : writer_value, {});
/// String in ColumnString must be null-terminated
writeChar('\0', writer_key);
writeChar('\0', writer_value);
serialized_key_offsets.emplace_back(writer_key.count());
serialized_value_offsets.emplace_back(writer_value.count());
}
}
writer_key.finalize();
writer_value.finalize();
}
return {std::move(serialized_key_column), std::move(serialized_value_column)};
}
void EmbeddedRocksDBBulkSink::consume(Chunk chunk_)
{
std::vector<Chunk> to_written = squash(std::move(chunk_));
if (to_written.empty())
return;
auto [serialized_key_column, serialized_value_column] = serializeChunks(to_written);
auto sst_file_path = getTemporarySSTFilePath();
if (auto status = buildSSTFile(sst_file_path, *serialized_key_column, *serialized_value_column); !status.ok())
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
/// Ingest the SST file
static rocksdb::IngestExternalFileOptions ingest_options;
ingest_options.move_files = true; /// The temporary file is on the same disk, so move (or hardlink) file will be faster than copy
if (auto status = storage.rocksdb_ptr->IngestExternalFile({sst_file_path}, ingest_options); !status.ok())
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
if (fs::exists(sst_file_path))
fs::remove(sst_file_path);
}
void EmbeddedRocksDBBulkSink::onFinish()
{
/// If there is any data left, write it.
if (!chunks.empty())
consume({});
}
String EmbeddedRocksDBBulkSink::getTemporarySSTFilePath()
{
return fs::path(insert_directory_queue) / (toString(file_counter++) + ".sst");
}
bool EmbeddedRocksDBBulkSink::isEnoughSize(const std::vector<Chunk> & input_chunks) const
{
size_t total_rows = 0;
for (const auto & chunk : input_chunks)
total_rows += chunk.getNumRows();
return total_rows >= min_block_size_rows;
}
bool EmbeddedRocksDBBulkSink::isEnoughSize(const Chunk & chunk) const
{
return chunk.getNumRows() >= min_block_size_rows;
}
}

View File

@ -1,69 +0,0 @@
#pragma once
#include <condition_variable>
#include <stdatomic.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <rocksdb/db.h>
#include <rocksdb/status.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadStatus.h>
#include <Common/ThreadPool.h>
#include <Columns/ColumnString.h>
#include <Processors/Chunk.h>
namespace DB
{
namespace fs = std::filesystem;
class StorageEmbeddedRocksDB;
class EmbeddedRocksDBBulkSink;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
/// Optimized for bulk importing into StorageEmbeddedRocksDB:
/// 1. No mem-table: an SST file is built from chunk, then import to rocksdb
/// 2. Squash chunks to reduce the number of SST files
class EmbeddedRocksDBBulkSink : public SinkToStorage, public WithContext
{
public:
EmbeddedRocksDBBulkSink(
ContextPtr context_,
StorageEmbeddedRocksDB & storage_,
const StorageMetadataPtr & metadata_snapshot_);
~EmbeddedRocksDBBulkSink() override;
void consume(Chunk chunk) override;
void onFinish() override;
String getName() const override { return "EmbeddedRocksDBBulkSink"; }
private:
/// Get a unique path to write temporary SST file
String getTemporarySSTFilePath();
/// Squash chunks to a minimum size
std::vector<Chunk> squash(Chunk chunk);
bool isEnoughSize(const std::vector<Chunk> & input_chunks) const;
bool isEnoughSize(const Chunk & chunk) const;
/// Serialize chunks to rocksdb key-value pairs
std::pair<ColumnString::Ptr, ColumnString::Ptr> serializeChunks(const std::vector<Chunk> & input_chunks) const;
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;
size_t primary_key_pos = 0;
Serializations serializations;
/// For squashing chunks
std::vector<Chunk> chunks;
size_t min_block_size_rows = 0;
/// For writing SST files
size_t file_counter = 0;
static constexpr auto TMP_INSERT_PREFIX = "tmp_insert_";
String insert_directory_queue;
};
}

View File

@ -1,41 +0,0 @@
#include "RocksDBSettings.h"
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(RockDBSettingsTraits, LIST_OF_ROCKSDB_SETTINGS)
void RocksDBSettings::loadFromQuery(ASTStorage & storage_def, ContextPtr /*context*/)
{
if (storage_def.settings)
{
try
{
auto changes = storage_def.settings->changes;
applyChanges(changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
}
std::vector<String> RocksDBSettings::getAllRegisteredNames() const
{
std::vector<String> all_settings;
for (const auto & setting_field : all())
all_settings.push_back(setting_field.getName());
return all_settings;
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Defines.h>
#include <Interpreters/Context_fwd.h>
#include <base/unit.h>
#include <Common/NamePrompter.h>
namespace Poco::Util
{
class AbstractConfiguration;
}
namespace DB
{
class ASTStorage;
struct Settings;
/** StorageEmbeddedRocksdb table settings
*/
#define ROCKSDB_SETTINGS(M, ALIAS) \
M(Bool, optimize_for_bulk_insert, true, "Table is optimized for bulk insertions (insert pipeline will create SST files and import to rocksdb database instead of writing to memtables)", 0) \
M(UInt64, bulk_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "Size of block for bulk insert, if it's smaller than query setting min_insert_block_size_rows then it will be overridden by min_insert_block_size_rows", 0) \
#define LIST_OF_ROCKSDB_SETTINGS(M, ALIAS) ROCKSDB_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(RockDBSettingsTraits, LIST_OF_ROCKSDB_SETTINGS)
struct RocksDBSettings : public BaseSettings<RockDBSettingsTraits>, public IHints<2>
{
void loadFromQuery(ASTStorage & storage_def, ContextPtr context);
std::vector<String> getAllRegisteredNames() const override;
};
}

View File

@ -1,5 +1,6 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/RocksDB/StorageEmbeddedRocksDB.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/MutationCommands.h>
#include <DataTypes/DataTypesNumber.h>
@ -27,15 +28,8 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Storages/AlterCommands.h>
#include <Storages/RocksDB/RocksDBSettings.h>
#include <IO/SharedThreadPools.h>
#include <Disks/DiskLocal.h>
#include <base/sort.h>
#include <rocksdb/advanced_options.h>
#include <rocksdb/env.h>
#include <rocksdb/options.h>
#include <rocksdb/table.h>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/db_ttl.h>
@ -45,6 +39,8 @@
#include <utility>
namespace fs = std::filesystem;
namespace DB
{
@ -178,7 +174,6 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
const StorageInMemoryMetadata & metadata_,
LoadingStrictnessLevel mode,
ContextPtr context_,
std::unique_ptr<RocksDBSettings> settings_,
const String & primary_key_,
Int32 ttl_,
String rocksdb_dir_,
@ -191,7 +186,6 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
, read_only(read_only_)
{
setInMemoryMetadata(metadata_);
setSettings(std::move(settings_));
if (rocksdb_dir.empty())
{
rocksdb_dir = context_->getPath() + relative_data_path_;
@ -242,20 +236,22 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
if (commands.front().type == MutationCommand::Type::DELETE)
{
MutationsInterpreter::Settings mutation_settings(true);
mutation_settings.return_all_columns = true;
mutation_settings.return_mutated_rows = true;
MutationsInterpreter::Settings settings(true);
settings.return_all_columns = true;
settings.return_mutated_rows = true;
auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr,
metadata_snapshot,
commands,
context_,
mutation_settings);
settings);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
auto sink = std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
auto header = interpreter->getUpdatedHeader();
auto primary_key_pos = header.getPositionByName(primary_key);
@ -291,16 +287,16 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
if (commands.front().column_to_update_expression.contains(primary_key))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Primary key cannot be updated (cannot update column {})", primary_key);
MutationsInterpreter::Settings mutation_settings(true);
mutation_settings.return_all_columns = true;
mutation_settings.return_mutated_rows = true;
MutationsInterpreter::Settings settings(true);
settings.return_all_columns = true;
settings.return_mutated_rows = true;
auto interpreter = std::make_unique<MutationsInterpreter>(
storage_ptr,
metadata_snapshot,
commands,
context_,
mutation_settings);
settings);
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
@ -356,6 +352,7 @@ void StorageEmbeddedRocksDB::initDB()
rocksdb::Options base;
base.create_if_missing = true;
base.compression = rocksdb::CompressionType::kZSTD;
base.statistics = rocksdb::CreateDBStatistics();
/// It is too verbose by default, and in fact we don't care about rocksdb logs at all.
base.info_log_level = rocksdb::ERROR_LEVEL;
@ -585,11 +582,8 @@ void ReadFromEmbeddedRocksDB::applyFilters(ActionDAGNodes added_filter_nodes)
}
SinkToStoragePtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context, bool /*async_insert*/)
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/, bool /*async_insert*/)
{
if (getSettings().optimize_for_bulk_insert)
return std::make_shared<EmbeddedRocksDBBulkSink>(query_context, *this, metadata_snapshot);
return std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
}
@ -628,21 +622,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "StorageEmbeddedRocksDB must require one column in primary key");
}
auto settings = std::make_unique<RocksDBSettings>();
settings->loadFromQuery(*args.storage_def, args.getContext());
if (args.storage_def->settings)
metadata.settings_changes = args.storage_def->settings->ptr();
else
{
/// A workaround because embedded rocksdb doesn't have default immutable settings
/// But InterpreterAlterQuery requires settings_changes to be set to run ALTER MODIFY
/// SETTING queries. So we just add a setting with its default value.
auto settings_changes = std::make_shared<ASTSetQuery>();
settings_changes->is_standalone = false;
settings_changes->changes.insertSetting("optimize_for_bulk_insert", settings->optimize_for_bulk_insert.value);
metadata.settings_changes = settings_changes;
}
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.mode, args.getContext(), std::move(settings), primary_key_names[0], ttl, std::move(rocksdb_dir), read_only);
return std::make_shared<StorageEmbeddedRocksDB>(args.table_id, args.relative_data_path, metadata, args.mode, args.getContext(), primary_key_names[0], ttl, std::move(rocksdb_dir), read_only);
}
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
@ -733,9 +713,9 @@ Chunk StorageEmbeddedRocksDB::getBySerializedKeys(
return Chunk(std::move(columns), num_rows);
}
std::optional<UInt64> StorageEmbeddedRocksDB::totalRows(const Settings & query_settings) const
std::optional<UInt64> StorageEmbeddedRocksDB::totalRows(const Settings & settings) const
{
if (!query_settings.optimize_trivial_approximate_count_query)
if (!settings.optimize_trivial_approximate_count_query)
return {};
std::shared_lock lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
@ -757,26 +737,9 @@ std::optional<UInt64> StorageEmbeddedRocksDB::totalBytes(const Settings & /*sett
return estimated_bytes;
}
void StorageEmbeddedRocksDB::alter(
const AlterCommands & params,
ContextPtr query_context,
AlterLockHolder & holder)
{
IStorage::alter(params, query_context, holder);
auto new_metadata = getInMemoryMetadataPtr();
if (new_metadata->settings_changes)
{
const auto & settings_changes = new_metadata->settings_changes->as<const ASTSetQuery &>();
auto new_settings = std::make_unique<RocksDBSettings>();
new_settings->applyChanges(settings_changes.changes);
setSettings(std::move(new_settings));
}
}
void registerStorageEmbeddedRocksDB(StorageFactory & factory)
{
StorageFactory::StorageFeatures features{
.supports_settings = true,
.supports_sort_order = true,
.supports_ttl = true,
.supports_parallel_insert = true,
@ -784,12 +747,4 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory)
factory.registerStorage("EmbeddedRocksDB", create, features);
}
void StorageEmbeddedRocksDB::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /* context */) const
{
for (const auto & command : commands)
if (!command.isCommentAlter() && !command.isSettingsAlter())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}", command.type, getName());
}
}

View File

@ -1,14 +1,11 @@
#pragma once
#include <memory>
#include <Common/MultiVersion.h>
#include <Common/SharedMutex.h>
#include <Storages/IStorage.h>
#include <Interpreters/IKeyValueEntity.h>
#include <rocksdb/status.h>
#include <Storages/IStorage.h>
#include <Storages/RocksDB/EmbeddedRocksDBSink.h>
#include <Storages/RocksDB/EmbeddedRocksDBBulkSink.h>
#include <Storages/RocksDB/RocksDBSettings.h>
namespace rocksdb
@ -30,7 +27,6 @@ class Context;
class StorageEmbeddedRocksDB final : public IStorage, public IKeyValueEntity, WithContext
{
friend class EmbeddedRocksDBSink;
friend class EmbeddedRocksDBBulkSink;
friend class ReadFromEmbeddedRocksDB;
public:
StorageEmbeddedRocksDB(const StorageID & table_id_,
@ -38,7 +34,6 @@ public:
const StorageInMemoryMetadata & metadata,
LoadingStrictnessLevel mode,
ContextPtr context_,
std::unique_ptr<RocksDBSettings> settings_,
const String & primary_key_,
Int32 ttl_ = 0,
String rocksdb_dir_ = "",
@ -64,7 +59,6 @@ public:
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
void mutate(const MutationCommands &, ContextPtr) override;
void drop() override;
void alter(const AlterCommands & params, ContextPtr query_context, AlterLockHolder &) override;
bool optimize(
const ASTPtr & query,
@ -105,16 +99,7 @@ public:
std::optional<UInt64> totalBytes(const Settings & settings) const override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr /* context */) const override;
const RocksDBSettings & getSettings() const { return *storage_settings.get(); }
void setSettings(std::unique_ptr<RocksDBSettings> && settings_) { storage_settings.set(std::move(settings_)); }
private:
SinkToStoragePtr getSink(ContextPtr context, const StorageMetadataPtr & metadata_snapshot);
MultiVersion<RocksDBSettings> storage_settings;
const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr;

View File

@ -4,7 +4,7 @@
DROP TABLE IF EXISTS 01686_test;
CREATE TABLE 01686_test (key UInt64, value String) Engine=EmbeddedRocksDB PRIMARY KEY(key) SETTINGS optimize_for_bulk_insert = 0;
CREATE TABLE 01686_test (key UInt64, value String) Engine=EmbeddedRocksDB PRIMARY KEY(key);
SELECT value FROM system.rocksdb WHERE database = currentDatabase() and table = '01686_test' and name = 'number.keys.written';
INSERT INTO 01686_test SELECT number, format('Hello, world ({})', toString(number)) FROM numbers(10000);

View File

@ -1,10 +0,0 @@
0
1000
1000
1
1000
2
1000000
1000
0 999001
1000000

View File

@ -1,48 +0,0 @@
#!/usr/bin/env bash
# Tags: no-ordinary-database, use-rocksdb
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Normal importing, as we only insert 1000 rows, so it should be in memtable
${CLICKHOUSE_CLIENT} --query "CREATE TABLE IF NOT EXISTS rocksdb_worm (key UInt64, value UInt64) ENGINE = EmbeddedRocksDB() PRIMARY KEY key SETTINGS optimize_for_bulk_insert = 0;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);"
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 0 because all data is still in memtable
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
# Enabling bulk insertion
${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING optimize_for_bulk_insert = 1;"
# Testing that key serialization is identical w. and w/o bulk sink
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+2 FROM numbers(1000);" # should override previous keys
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm WHERE value = key + 2;"
# With bulk insertion, there is no memtable, so a small insert should create a new file
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers(1000);"
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 1
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
# Testing insert with multiple sinks and fixed block size
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "ALTER TABLE rocksdb_worm MODIFY SETTING bulk_insert_block_size = 500000;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_insert_threads = 2, max_block_size = 100000;"
${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 2 as max_block_size is set to 500000
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
# Testing insert with duplicated keys
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number % 1000, number+1 FROM numbers_mt(1000000) SETTINGS max_block_size = 100000, max_insert_threads = 1;"
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "SELECT * FROM rocksdb_worm WHERE key = 0;" # should be the latest value - 999001
# Testing insert with multiple threads
${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;"
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" &
${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000)" &
wait
${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;"

View File

@ -1949,8 +1949,6 @@ mdadm
meanZTest
meanztest
mebibytes
memtable
memtables
mergeTreeIndex
mergeable
mergetree