diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index 459c0879cda..7c5d36b37ce 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -36,6 +36,7 @@ #include #include +#include namespace fs = std::filesystem; @@ -213,9 +214,9 @@ public: std::vector slices_keys(num_keys); const auto & sample_block = metadata_snapshot->getSampleBlock(); - const auto & key_column = sample_block.getByName(storage.primary_key); + const auto & key_column = sample_block.getByName(storage.getPrimaryKey()); auto columns = sample_block.cloneEmptyColumns(); - size_t primary_key_pos = sample_block.getPositionByName(storage.primary_key); + size_t primary_key_pos = sample_block.getPositionByName(storage.getPrimaryKey()); size_t rows_processed = 0; while (it < end && rows_processed < max_block_size) @@ -230,8 +231,7 @@ public: } std::vector values; - auto statuses = storage.rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values); - + auto statuses = storage.multiGet(slices_keys, values); for (size_t i = 0; i < statuses.size(); ++i) { if (statuses[i].ok()) @@ -285,7 +285,10 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_, void StorageEmbeddedRocksDB::truncate(const ASTPtr &, const StorageMetadataPtr & , ContextPtr, TableExclusiveLockHolder &) { + std::unique_lock lock(rocksdb_ptr_mx); rocksdb_ptr->Close(); + rocksdb_ptr = nullptr; + fs::remove_all(rocksdb_dir); fs::create_directories(rocksdb_dir); initDb(); @@ -460,9 +463,20 @@ static StoragePtr create(const StorageFactory::Arguments & args) std::shared_ptr StorageEmbeddedRocksDB::getRocksDBStatistics() const { + std::shared_lock lock(rocksdb_ptr_mx); + if (!rocksdb_ptr) + return nullptr; return rocksdb_ptr->GetOptions().statistics; } +std::vector StorageEmbeddedRocksDB::multiGet(const std::vector & slices_keys, std::vector & values) const +{ + std::shared_lock lock(rocksdb_ptr_mx); + if (!rocksdb_ptr) + return {}; + return rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values); +} + void registerStorageEmbeddedRocksDB(StorageFactory & factory) { StorageFactory::StorageFeatures features{ diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h index 3f1b3b49492..71460a1667e 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h @@ -1,8 +1,10 @@ #pragma once #include +#include #include #include +#include namespace rocksdb @@ -20,7 +22,6 @@ class Context; class StorageEmbeddedRocksDB final : public shared_ptr_helper, public IStorage, WithContext { friend struct shared_ptr_helper; - friend class EmbeddedRocksDBSource; friend class EmbeddedRocksDBSink; friend class EmbeddedRocksDBBlockInputStream; public: @@ -50,6 +51,8 @@ public: Strings getDataPaths() const override { return {rocksdb_dir}; } std::shared_ptr getRocksDBStatistics() const; + std::vector multiGet(const std::vector & slices_keys, std::vector & values) const; + const String & getPrimaryKey() const { return primary_key; } protected: StorageEmbeddedRocksDB(const StorageID & table_id_, @@ -63,6 +66,7 @@ private: const String primary_key; using RocksDBPtr = std::unique_ptr; RocksDBPtr rocksdb_ptr; + mutable std::shared_mutex rocksdb_ptr_mx; String rocksdb_dir; void initDb(); diff --git a/src/Storages/RocksDB/StorageSystemRocksDB.cpp b/src/Storages/RocksDB/StorageSystemRocksDB.cpp index 7d31d5ddc21..cbb96ed4001 100644 --- a/src/Storages/RocksDB/StorageSystemRocksDB.cpp +++ b/src/Storages/RocksDB/StorageSystemRocksDB.cpp @@ -43,7 +43,8 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con const auto access = context->getAccess(); const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); - std::map> tables; + using RocksDBStoragePtr = std::shared_ptr; + std::map> tables; for (const auto & db : DatabaseCatalog::instance().getDatabases()) { const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); @@ -51,18 +52,17 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) { StoragePtr table = iterator->table(); - if (!table) + RocksDBStoragePtr rocksdb_table = table ? std::dynamic_pointer_cast(table) : nullptr; + if (!rocksdb_table) continue; - if (!dynamic_cast(table.get())) - continue; if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) continue; - tables[db.first][iterator->name()] = table; + + tables[db.first][iterator->name()] = rocksdb_table; } } - MutableColumnPtr col_database_mut = ColumnString::create(); MutableColumnPtr col_table_mut = ColumnString::create(); @@ -101,10 +101,9 @@ void StorageSystemRocksDB::fillData(MutableColumns & res_columns, ContextPtr con String database = (*col_database_to_filter)[i].safeGet(); String table = (*col_table_to_filter)[i].safeGet(); - auto & rocksdb_table = dynamic_cast(*tables[database][table]); - auto statistics = rocksdb_table.getRocksDBStatistics(); + auto statistics = tables[database][table]->getRocksDBStatistics(); if (!statistics) - throw Exception(ErrorCodes::LOGICAL_ERROR, "rocksdb statistics is not enabled"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "RocksDB statistics are not available"); for (auto [tick, name] : rocksdb::TickersNameMap) { diff --git a/tests/queries/0_stateless/02030_rocksdb_race_long.reference b/tests/queries/0_stateless/02030_rocksdb_race_long.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02030_rocksdb_race_long.sh b/tests/queries/0_stateless/02030_rocksdb_race_long.sh new file mode 100755 index 00000000000..88c30852c86 --- /dev/null +++ b/tests/queries/0_stateless/02030_rocksdb_race_long.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash +# Tags: race + +unset CLICKHOUSE_LOG_COMMENT + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -o errexit +set -o pipefail + +echo " + DROP TABLE IF EXISTS rocksdb_race; + CREATE TABLE rocksdb_race (key String, value UInt32) Engine=EmbeddedRocksDB PRIMARY KEY(key); + INSERT INTO rocksdb_race SELECT '1_' || toString(number), number FROM numbers(100000); +" | $CLICKHOUSE_CLIENT -n + +function read_stat_thread() +{ + while true; do + echo " + SELECT * FROM system.rocksdb FORMAT Null; + " | $CLICKHOUSE_CLIENT -n + done +} + +function truncate_thread() +{ + while true; do + sleep 3s; + echo " + TRUNCATE TABLE rocksdb_race; + " | $CLICKHOUSE_CLIENT -n + done +} + +# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout +export -f read_stat_thread; +export -f truncate_thread; + +TIMEOUT=20 + +timeout $TIMEOUT bash -c read_stat_thread 2> /dev/null & +timeout $TIMEOUT bash -c truncate_thread 2> /dev/null & + +wait + +$CLICKHOUSE_CLIENT -q "DROP TABLE rocksdb_race"