Fix race in StorageSystemRocksDB truncate

This commit is contained in:
vdimir 2021-09-23 17:03:33 +03:00
parent 81a051e3ee
commit 9ca9a66a70
No known key found for this signature in database
GPG Key ID: 9B404D301C0CC7EB
3 changed files with 26 additions and 6 deletions

View File

@ -36,6 +36,8 @@
#include <rocksdb/convenience.h> #include <rocksdb/convenience.h>
#include <filesystem> #include <filesystem>
#include <mutex>
#include <shared_mutex>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -213,9 +215,9 @@ public:
std::vector<rocksdb::Slice> slices_keys(num_keys); std::vector<rocksdb::Slice> slices_keys(num_keys);
const auto & sample_block = metadata_snapshot->getSampleBlock(); const auto & sample_block = metadata_snapshot->getSampleBlock();
const auto & key_column = sample_block.getByName(storage.primary_key); const auto & key_column = sample_block.getByName(storage.getPrimaryKey());
auto columns = sample_block.cloneEmptyColumns(); auto columns = sample_block.cloneEmptyColumns();
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key); size_t primary_key_pos = sample_block.getPositionByName(storage.getPrimaryKey());
size_t rows_processed = 0; size_t rows_processed = 0;
while (it < end && rows_processed < max_block_size) while (it < end && rows_processed < max_block_size)
@ -230,8 +232,7 @@ public:
} }
std::vector<String> values; std::vector<String> values;
auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values); auto statuses = storage.multiGet(slices_keys, values);
for (size_t i = 0; i < statuses.size(); ++i) for (size_t i = 0; i < statuses.size(); ++i)
{ {
if (statuses[i].ok()) if (statuses[i].ok())
@ -285,7 +286,10 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &) void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &)
{ {
std::unique_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
rocksdb_ptr->Close(); rocksdb_ptr->Close();
rocksdb_ptr = nullptr;
fs::remove_all(rocksdb_dir); fs::remove_all(rocksdb_dir);
fs::create_directories(rocksdb_dir); fs::create_directories(rocksdb_dir);
initDb(); initDb();
@ -460,9 +464,20 @@ static StoragePtr create(const StorageFactory::Arguments & args)
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
{ {
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
return nullptr;
return rocksdb_ptr->GetOptions().statistics; return rocksdb_ptr->GetOptions().statistics;
} }
std::vector<rocksdb::Status> StorageEmbeddedRocksDB::multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const
{
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
return {};
return rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
}
void registerStorageEmbeddedRocksDB(StorageFactory & factory) void registerStorageEmbeddedRocksDB(StorageFactory & factory)
{ {
StorageFactory::StorageFeatures features{ StorageFactory::StorageFeatures features{

View File

@ -1,14 +1,17 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <shared_mutex>
#include <common/shared_ptr_helper.h> #include <common/shared_ptr_helper.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <rocksdb/status.h>
namespace rocksdb namespace rocksdb
{ {
class DB; class DB;
class Statistics; class Statistics;
class Slice;
} }
@ -20,7 +23,6 @@ class Context;
class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRocksDB>, public IStorage, WithContext class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRocksDB>, public IStorage, WithContext
{ {
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>; friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
friend class EmbeddedRocksDBSource;
friend class EmbeddedRocksDBSink; friend class EmbeddedRocksDBSink;
friend class EmbeddedRocksDBBlockInputStream; friend class EmbeddedRocksDBBlockInputStream;
public: public:
@ -50,6 +52,8 @@ public:
Strings getDataPaths() const override { return {rocksdb_dir}; } Strings getDataPaths() const override { return {rocksdb_dir}; }
std::shared_ptr<rocksdb::Statistics> getRocksDBStatistics() const; std::shared_ptr<rocksdb::Statistics> getRocksDBStatistics() const;
std::vector<rocksdb::Status> multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const;
const String & getPrimaryKey() const { return primary_key; }
protected: protected:
StorageEmbeddedRocksDB(const StorageID & table_id_, StorageEmbeddedRocksDB(const StorageID & table_id_,
@ -63,6 +67,7 @@ private:
const String primary_key; const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>; using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr; RocksDBPtr rocksdb_ptr;
mutable std::shared_mutex rocksdb_ptr_mx;
String rocksdb_dir; String rocksdb_dir;
void initDb(); void initDb();

View File

@ -103,7 +103,7 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con
auto statistics = tables[database][table]->getRocksDBStatistics(); auto statistics = tables[database][table]->getRocksDBStatistics();
if (!statistics) if (!statistics)
throw Exception(ErrorCodes::LOGICAL_ERROR, "rocksdb statistics is not enabled"); throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB statistics is not avaliable");
for (auto [tick, name] : rocksdb::TickersNameMap) for (auto [tick, name] : rocksdb::TickersNameMap)
{ {