mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 18:42:26 +00:00
Merge pull request #29289 from vdimir/issue-29227-rocksdb-truncate
Fix race in StorageSystemRocksDB
This commit is contained in:
commit
1c1e2e6b28
@ -36,6 +36,7 @@
|
|||||||
#include <rocksdb/convenience.h>
|
#include <rocksdb/convenience.h>
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
#include <shared_mutex>
|
||||||
|
|
||||||
|
|
||||||
namespace fs = std::filesystem;
|
namespace fs = std::filesystem;
|
||||||
@ -213,9 +214,9 @@ public:
|
|||||||
std::vector<rocksdb::Slice> slices_keys(num_keys);
|
std::vector<rocksdb::Slice> slices_keys(num_keys);
|
||||||
|
|
||||||
const auto & sample_block = metadata_snapshot->getSampleBlock();
|
const auto & sample_block = metadata_snapshot->getSampleBlock();
|
||||||
const auto & key_column = sample_block.getByName(storage.primary_key);
|
const auto & key_column = sample_block.getByName(storage.getPrimaryKey());
|
||||||
auto columns = sample_block.cloneEmptyColumns();
|
auto columns = sample_block.cloneEmptyColumns();
|
||||||
size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key);
|
size_t primary_key_pos = sample_block.getPositionByName(storage.getPrimaryKey());
|
||||||
|
|
||||||
size_t rows_processed = 0;
|
size_t rows_processed = 0;
|
||||||
while (it < end && rows_processed < max_block_size)
|
while (it < end && rows_processed < max_block_size)
|
||||||
@ -230,8 +231,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> values;
|
std::vector<String> values;
|
||||||
auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
|
auto statuses = storage.multiGet(slices_keys, values);
|
||||||
|
|
||||||
for (size_t i = 0; i < statuses.size(); ++i)
|
for (size_t i = 0; i < statuses.size(); ++i)
|
||||||
{
|
{
|
||||||
if (statuses[i].ok())
|
if (statuses[i].ok())
|
||||||
@ -285,7 +285,10 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
|
|||||||
|
|
||||||
void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &)
|
void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &)
|
||||||
{
|
{
|
||||||
|
std::unique_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
|
||||||
rocksdb_ptr->Close();
|
rocksdb_ptr->Close();
|
||||||
|
rocksdb_ptr = nullptr;
|
||||||
|
|
||||||
fs::remove_all(rocksdb_dir);
|
fs::remove_all(rocksdb_dir);
|
||||||
fs::create_directories(rocksdb_dir);
|
fs::create_directories(rocksdb_dir);
|
||||||
initDb();
|
initDb();
|
||||||
@ -460,9 +463,20 @@ static StoragePtr create(const StorageFactory::Arguments & args)
|
|||||||
|
|
||||||
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
|
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
|
||||||
{
|
{
|
||||||
|
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
|
||||||
|
if (!rocksdb_ptr)
|
||||||
|
return nullptr;
|
||||||
return rocksdb_ptr->GetOptions().statistics;
|
return rocksdb_ptr->GetOptions().statistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<rocksdb::Status> StorageEmbeddedRocksDB::multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const
|
||||||
|
{
|
||||||
|
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
|
||||||
|
if (!rocksdb_ptr)
|
||||||
|
return {};
|
||||||
|
return rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);
|
||||||
|
}
|
||||||
|
|
||||||
void registerStorageEmbeddedRocksDB(StorageFactory & factory)
|
void registerStorageEmbeddedRocksDB(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
StorageFactory::StorageFeatures features{
|
StorageFactory::StorageFeatures features{
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <shared_mutex>
|
||||||
#include <common/shared_ptr_helper.h>
|
#include <common/shared_ptr_helper.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
|
#include <rocksdb/status.h>
|
||||||
|
|
||||||
|
|
||||||
namespace rocksdb
|
namespace rocksdb
|
||||||
@ -20,7 +22,6 @@ class Context;
|
|||||||
class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRocksDB>, public IStorage, WithContext
|
class StorageEmbeddedRocksDB final : public shared_ptr_helper<StorageEmbeddedRocksDB>, public IStorage, WithContext
|
||||||
{
|
{
|
||||||
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
|
friend struct shared_ptr_helper<StorageEmbeddedRocksDB>;
|
||||||
friend class EmbeddedRocksDBSource;
|
|
||||||
friend class EmbeddedRocksDBSink;
|
friend class EmbeddedRocksDBSink;
|
||||||
friend class EmbeddedRocksDBBlockInputStream;
|
friend class EmbeddedRocksDBBlockInputStream;
|
||||||
public:
|
public:
|
||||||
@ -50,6 +51,8 @@ public:
|
|||||||
Strings getDataPaths() const override { return {rocksdb_dir}; }
|
Strings getDataPaths() const override { return {rocksdb_dir}; }
|
||||||
|
|
||||||
std::shared_ptr<rocksdb::Statistics> getRocksDBStatistics() const;
|
std::shared_ptr<rocksdb::Statistics> getRocksDBStatistics() const;
|
||||||
|
std::vector<rocksdb::Status> multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const;
|
||||||
|
const String & getPrimaryKey() const { return primary_key; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
StorageEmbeddedRocksDB(const StorageID & table_id_,
|
||||||
@ -63,6 +66,7 @@ private:
|
|||||||
const String primary_key;
|
const String primary_key;
|
||||||
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
|
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
|
||||||
RocksDBPtr rocksdb_ptr;
|
RocksDBPtr rocksdb_ptr;
|
||||||
|
mutable std::shared_mutex rocksdb_ptr_mx;
|
||||||
String rocksdb_dir;
|
String rocksdb_dir;
|
||||||
|
|
||||||
void initDb();
|
void initDb();
|
||||||
|
@ -43,7 +43,8 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con
|
|||||||
const auto access = context->getAccess();
|
const auto access = context->getAccess();
|
||||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||||
|
|
||||||
std::map<String, std::map<String, StoragePtr>> tables;
|
using RocksDBStoragePtr = std::shared_ptr<StorageEmbeddedRocksDB>;
|
||||||
|
std::map<String, std::map<String, RocksDBStoragePtr>> tables;
|
||||||
for (const auto & db : DatabaseCatalog::instance().getDatabases())
|
for (const auto & db : DatabaseCatalog::instance().getDatabases())
|
||||||
{
|
{
|
||||||
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
|
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);
|
||||||
@ -51,17 +52,16 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con
|
|||||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||||
{
|
{
|
||||||
StoragePtr table = iterator->table();
|
StoragePtr table = iterator->table();
|
||||||
if (!table)
|
RocksDBStoragePtr rocksdb_table = table ? std::dynamic_pointer_cast<StorageEmbeddedRocksDB>(table) : nullptr;
|
||||||
|
if (!rocksdb_table)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (!dynamic_cast<const StorageEmbeddedRocksDB *>(table.get()))
|
|
||||||
continue;
|
|
||||||
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
|
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
|
||||||
continue;
|
continue;
|
||||||
tables[db.first][iterator->name()] = table;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
tables[db.first][iterator->name()] = rocksdb_table;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MutableColumnPtr col_database_mut = ColumnString::create();
|
MutableColumnPtr col_database_mut = ColumnString::create();
|
||||||
MutableColumnPtr col_table_mut = ColumnString::create();
|
MutableColumnPtr col_table_mut = ColumnString::create();
|
||||||
@ -101,10 +101,9 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con
|
|||||||
String database = (*col_database_to_filter)[i].safeGet<const String &>();
|
String database = (*col_database_to_filter)[i].safeGet<const String &>();
|
||||||
String table = (*col_table_to_filter)[i].safeGet<const String &>();
|
String table = (*col_table_to_filter)[i].safeGet<const String &>();
|
||||||
|
|
||||||
auto & rocksdb_table = dynamic_cast<StorageEmbeddedRocksDB &>(*tables[database][table]);
|
auto statistics = tables[database][table]->getRocksDBStatistics();
|
||||||
auto statistics = rocksdb_table.getRocksDBStatistics();
|
|
||||||
if (!statistics)
|
if (!statistics)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "rocksdb statistics is not enabled");
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB statistics are not available");
|
||||||
|
|
||||||
for (auto [tick, name] : rocksdb::TickersNameMap)
|
for (auto [tick, name] : rocksdb::TickersNameMap)
|
||||||
{
|
{
|
||||||
|
49
tests/queries/0_stateless/02030_rocksdb_race_long.sh
Executable file
49
tests/queries/0_stateless/02030_rocksdb_race_long.sh
Executable file
@ -0,0 +1,49 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# Tags: race
|
||||||
|
|
||||||
|
unset CLICKHOUSE_LOG_COMMENT
|
||||||
|
|
||||||
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CURDIR"/../shell_config.sh
|
||||||
|
|
||||||
|
set -o errexit
|
||||||
|
set -o pipefail
|
||||||
|
|
||||||
|
echo "
|
||||||
|
DROP TABLE IF EXISTS rocksdb_race;
|
||||||
|
CREATE TABLE rocksdb_race (key String, value UInt32) Engine=EmbeddedRocksDB PRIMARY KEY(key);
|
||||||
|
INSERT INTO rocksdb_race SELECT '1_' || toString(number), number FROM numbers(100000);
|
||||||
|
" | $CLICKHOUSE_CLIENT -n
|
||||||
|
|
||||||
|
function read_stat_thread()
|
||||||
|
{
|
||||||
|
while true; do
|
||||||
|
echo "
|
||||||
|
SELECT * FROM system.rocksdb FORMAT Null;
|
||||||
|
" | $CLICKHOUSE_CLIENT -n
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function truncate_thread()
|
||||||
|
{
|
||||||
|
while true; do
|
||||||
|
sleep 3s;
|
||||||
|
echo "
|
||||||
|
TRUNCATE TABLE rocksdb_race;
|
||||||
|
" | $CLICKHOUSE_CLIENT -n
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
|
||||||
|
export -f read_stat_thread;
|
||||||
|
export -f truncate_thread;
|
||||||
|
|
||||||
|
TIMEOUT=20
|
||||||
|
|
||||||
|
timeout $TIMEOUT bash -c read_stat_thread 2> /dev/null &
|
||||||
|
timeout $TIMEOUT bash -c truncate_thread 2> /dev/null &
|
||||||
|
|
||||||
|
wait
|
||||||
|
|
||||||
|
$CLICKHOUSE_CLIENT -q "DROP TABLE rocksdb_race"
|
Loading…
Reference in New Issue
Block a user