Merge pull request #47260 from ClickHouse/vdimir/storage_join_insert_select_deadlock

Forbid insert select for the same StorageJoin
This commit is contained in:
robot-ch-test-poll1 2023-03-15 12:52:29 +01:00 committed by GitHub
commit 43db7c59d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 56 additions and 14 deletions

View File

@ -97,7 +97,7 @@ private:
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (owner_query_it != owner_queries.end())
{
if (wrlock_owner != writers_queue.end())
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
return nullptr;
}
/// Lock upgrading is not supported
if (type == Write)
{
if (throw_in_fast_path)
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
return nullptr;
}
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
++owner_query_it->second; /// SM1: nothrow

View File

@ -56,7 +56,7 @@ public:
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true);
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();

View File

@ -40,7 +40,7 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
{
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry.",
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry",
type_str, getStorageID(), acquire_timeout.count());
}
return lock_holder;

View File

@ -21,21 +21,24 @@
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Poco/String.h> /// toLower
#include <Poco/String.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int UNSUPPORTED_JOIN_KEYS;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int DEADLOCK_AVOIDED;
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
extern const int LOGICAL_ERROR;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNSUPPORTED_JOIN_KEYS;
}
StorageJoin::StorageJoin(
@ -78,6 +81,14 @@ RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock,
return tryLockTimed(lock, type, query_id, acquire_timeout);
}
RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context)
{
const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
const std::chrono::milliseconds acquire_timeout
= context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
return lock->getLock(type, query_id, acquire_timeout, false);
}
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::lock_guard mutate_lock(mutate_mutex);
@ -95,7 +106,7 @@ void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPt
LOG_INFO(&Poco::Logger::get("StorageJoin"), "Path {} is already removed from disk {}", path, disk->getName());
disk->createDirectories(path);
disk->createDirectories(path + "tmp/");
disk->createDirectories(fs::path(path) / "tmp/");
increment = 0;
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
@ -238,8 +249,12 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context)
{
Block block_to_insert = block;
convertRightBlock(block_to_insert);
TableLockHolder holder = tryLockForCurrentQueryTimedWithContext(rwlock, RWLockImpl::Write, context);
/// Protection from `INSERT INTO test_table_join SELECT * FROM test_table_join`
if (!holder)
throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
join->addJoinedBlock(block_to_insert, true);
}

View File

@ -100,12 +100,15 @@ private:
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
mutable RWLock rwlock = RWLockImpl::create();
mutable std::mutex mutate_mutex;
void insertBlock(const Block & block, ContextPtr context) override;
void finishInsert() override {}
size_t getSize(ContextPtr context) const override;
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
/// Same as tryLockTimedWithContext, but returns `nullptr` if lock is already acquired by current query.
static RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context);
void convertRightBlock(Block & block) const;
};

View File

@ -0,0 +1,16 @@
DROP TABLE IF EXISTS test_table_join;
CREATE TABLE test_table_join
(
id UInt64,
value String
) ENGINE = Join(Any, Left, id);
INSERT INTO test_table_join VALUES (1, 'q');
INSERT INTO test_table_join SELECT * from test_table_join; -- { serverError DEADLOCK_AVOIDED }
INSERT INTO test_table_join SELECT * FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN test_table_join USING (id); -- { serverError DEADLOCK_AVOIDED }
INSERT INTO test_table_join SELECT id, toString(id) FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN (SELECT id FROM test_table_join) AS t2 USING (id); -- { serverError DEADLOCK_AVOIDED }
DROP TABLE IF EXISTS test_table_join;