From c9e48f37cbb4d8dc60503756815737f04a86808e Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 6 Mar 2023 10:54:43 +0000 Subject: [PATCH 1/2] Forbid insert select for the same StorageJoin --- src/Common/RWLock.cpp | 14 ++++++-- src/Common/RWLock.h | 2 +- src/Storages/IStorage.cpp | 2 +- src/Storages/StorageJoin.cpp | 33 ++++++++++++++----- src/Storages/StorageJoin.h | 3 ++ ...rage_join_insert_select_deadlock.reference | 0 ...40_storage_join_insert_select_deadlock.sql | 16 +++++++++ 7 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.reference create mode 100644 tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.sql diff --git a/src/Common/RWLock.cpp b/src/Common/RWLock.cpp index c2419d0c1b7..2d0fcfa3e74 100644 --- a/src/Common/RWLock.cpp +++ b/src/Common/RWLock.cpp @@ -97,7 +97,7 @@ private: * Note: "SM" in the commentaries below stands for STATE MODIFICATION */ RWLockImpl::LockHolder -RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms) +RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path) { const auto lock_deadline_tp = (lock_timeout_ms == std::chrono::milliseconds(0)) @@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c if (owner_query_it != owner_queries.end()) { if (wrlock_owner != writers_queue.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode"); + { + if (throw_in_fast_path) + throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode"); + return nullptr; + } /// Lock upgrading is not supported if (type == Write) - throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked"); + { + if (throw_in_fast_path) + throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked"); + return nullptr; + } /// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator ++owner_query_it->second; /// SM1: nothrow diff --git a/src/Common/RWLock.h b/src/Common/RWLock.h index cb4cf7f9200..dd965b65026 100644 --- a/src/Common/RWLock.h +++ b/src/Common/RWLock.h @@ -56,7 +56,7 @@ public: /// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread). LockHolder getLock(Type type, const String & query_id, - const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0)); + const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true); /// Use as query_id to acquire a lock outside the query context. inline static const String NO_QUERY = String(); diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 9bcfff65c95..d50f335c1c9 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -40,7 +40,7 @@ RWLockImpl::LockHolder IStorage::tryLockTimed( { const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE"; throw Exception(ErrorCodes::DEADLOCK_AVOIDED, - "{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry.", + "{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry", type_str, getStorageID(), acquire_timeout.count()); } return lock_holder; diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index ab1406f9bd6..79194eb0d2d 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -21,21 +21,24 @@ #include #include #include -#include /// toLower +#include +#include +namespace fs = std::filesystem; namespace DB { namespace ErrorCodes { - extern const int NOT_IMPLEMENTED; - extern const int LOGICAL_ERROR; - extern const int UNSUPPORTED_JOIN_KEYS; - extern const int NO_SUCH_COLUMN_IN_TABLE; - extern const int INCOMPATIBLE_TYPE_OF_JOIN; - extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int BAD_ARGUMENTS; + extern const int DEADLOCK_AVOIDED; + extern const int INCOMPATIBLE_TYPE_OF_JOIN; + extern const int LOGICAL_ERROR; + extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int NOT_IMPLEMENTED; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int UNSUPPORTED_JOIN_KEYS; } StorageJoin::StorageJoin( @@ -78,6 +81,14 @@ RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, return tryLockTimed(lock, type, query_id, acquire_timeout); } +RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const +{ + const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY; + const std::chrono::milliseconds acquire_timeout + = context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC); + return lock->getLock(type, query_id, acquire_timeout, false); +} + SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { std::lock_guard mutate_lock(mutate_mutex); @@ -95,7 +106,7 @@ void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPt LOG_INFO(&Poco::Logger::get("StorageJoin"), "Path {} is already removed from disk {}", path, disk->getName()); disk->createDirectories(path); - disk->createDirectories(path + "tmp/"); + disk->createDirectories(fs::path(path) / "tmp/"); increment = 0; join = std::make_shared(table_join, getRightSampleBlock(), overwrite); @@ -238,8 +249,12 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context) { Block block_to_insert = block; convertRightBlock(block_to_insert); + TableLockHolder holder = tryLockForCurrentQueryTimedWithContext(rwlock, RWLockImpl::Write, context); + + /// Protection from `INSERT INTO test_table_join SELECT * FROM test_table_join` + if (!holder) + throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage"); - TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context); join->addJoinedBlock(block_to_insert, true); } diff --git a/src/Storages/StorageJoin.h b/src/Storages/StorageJoin.h index 61ea743c841..4497d737a3b 100644 --- a/src/Storages/StorageJoin.h +++ b/src/Storages/StorageJoin.h @@ -100,12 +100,15 @@ private: /// Protect state for concurrent use in insertFromBlock and joinBlock. /// Lock is stored in HashJoin instance during query and blocks concurrent insertions. mutable RWLock rwlock = RWLockImpl::create(); + mutable std::mutex mutate_mutex; void insertBlock(const Block & block, ContextPtr context) override; void finishInsert() override {} size_t getSize(ContextPtr context) const override; RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const; + /// Same as tryLockTimedWithContext, but returns `nullptr` if lock is already acquired by current query. + RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const; void convertRightBlock(Block & block) const; }; diff --git a/tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.reference b/tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.sql b/tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.sql new file mode 100644 index 00000000000..59528511357 --- /dev/null +++ b/tests/queries/0_stateless/25340_storage_join_insert_select_deadlock.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS test_table_join; + +CREATE TABLE test_table_join +( + id UInt64, + value String +) ENGINE = Join(Any, Left, id); + +INSERT INTO test_table_join VALUES (1, 'q'); + +INSERT INTO test_table_join SELECT * from test_table_join; -- { serverError DEADLOCK_AVOIDED } + +INSERT INTO test_table_join SELECT * FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN test_table_join USING (id); -- { serverError DEADLOCK_AVOIDED } +INSERT INTO test_table_join SELECT id, toString(id) FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN (SELECT id FROM test_table_join) AS t2 USING (id); -- { serverError DEADLOCK_AVOIDED } + +DROP TABLE IF EXISTS test_table_join; From 9f220054c829e1bbbf5a72fdedd7998281e115ef Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 13 Mar 2023 10:49:51 +0000 Subject: [PATCH 2/2] Fix build --- src/Storages/StorageJoin.cpp | 2 +- src/Storages/StorageJoin.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageJoin.cpp b/src/Storages/StorageJoin.cpp index 79194eb0d2d..dec741beb45 100644 --- a/src/Storages/StorageJoin.cpp +++ b/src/Storages/StorageJoin.cpp @@ -81,7 +81,7 @@ RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, return tryLockTimed(lock, type, query_id, acquire_timeout); } -RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const +RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) { const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY; const std::chrono::milliseconds acquire_timeout diff --git a/src/Storages/StorageJoin.h b/src/Storages/StorageJoin.h index 4497d737a3b..a5e85d8788a 100644 --- a/src/Storages/StorageJoin.h +++ b/src/Storages/StorageJoin.h @@ -108,7 +108,7 @@ private: size_t getSize(ContextPtr context) const override; RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const; /// Same as tryLockTimedWithContext, but returns `nullptr` if lock is already acquired by current query. - RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const; + static RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context); void convertRightBlock(Block & block) const; };