mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Forbid insert select for the same StorageJoin
This commit is contained in:
parent
e23f624968
commit
c9e48f37cb
@ -97,7 +97,7 @@ private:
|
||||
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
|
||||
*/
|
||||
RWLockImpl::LockHolder
|
||||
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
|
||||
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms, bool throw_in_fast_path)
|
||||
{
|
||||
const auto lock_deadline_tp =
|
||||
(lock_timeout_ms == std::chrono::milliseconds(0))
|
||||
@ -130,11 +130,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
|
||||
if (owner_query_it != owner_queries.end())
|
||||
{
|
||||
if (wrlock_owner != writers_queue.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
|
||||
{
|
||||
if (throw_in_fast_path)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): RWLock is already locked in exclusive mode");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/// Lock upgrading is not supported
|
||||
if (type == Write)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
|
||||
{
|
||||
if (throw_in_fast_path)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
|
||||
++owner_query_it->second; /// SM1: nothrow
|
||||
|
@ -56,7 +56,7 @@ public:
|
||||
|
||||
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
|
||||
LockHolder getLock(Type type, const String & query_id,
|
||||
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
|
||||
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0), bool throw_in_fast_path = true);
|
||||
|
||||
/// Use as query_id to acquire a lock outside the query context.
|
||||
inline static const String NO_QUERY = String();
|
||||
|
@ -40,7 +40,7 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
|
||||
{
|
||||
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
|
||||
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
|
||||
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry.",
|
||||
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry",
|
||||
type_str, getStorageID(), acquire_timeout.count());
|
||||
}
|
||||
return lock_holder;
|
||||
|
@ -21,21 +21,24 @@
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <Processors/Executors/PullingPipelineExecutor.h>
|
||||
#include <Poco/String.h> /// toLower
|
||||
#include <Poco/String.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int UNSUPPORTED_JOIN_KEYS;
|
||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int DEADLOCK_AVOIDED;
|
||||
extern const int INCOMPATIBLE_TYPE_OF_JOIN;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int UNSUPPORTED_JOIN_KEYS;
|
||||
}
|
||||
|
||||
StorageJoin::StorageJoin(
|
||||
@ -78,6 +81,14 @@ RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock,
|
||||
return tryLockTimed(lock, type, query_id, acquire_timeout);
|
||||
}
|
||||
|
||||
RWLockImpl::LockHolder StorageJoin::tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
|
||||
{
|
||||
const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
|
||||
const std::chrono::milliseconds acquire_timeout
|
||||
= context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
|
||||
return lock->getLock(type, query_id, acquire_timeout, false);
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
{
|
||||
std::lock_guard mutate_lock(mutate_mutex);
|
||||
@ -95,7 +106,7 @@ void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPt
|
||||
LOG_INFO(&Poco::Logger::get("StorageJoin"), "Path {} is already removed from disk {}", path, disk->getName());
|
||||
|
||||
disk->createDirectories(path);
|
||||
disk->createDirectories(path + "tmp/");
|
||||
disk->createDirectories(fs::path(path) / "tmp/");
|
||||
|
||||
increment = 0;
|
||||
join = std::make_shared<HashJoin>(table_join, getRightSampleBlock(), overwrite);
|
||||
@ -238,8 +249,12 @@ void StorageJoin::insertBlock(const Block & block, ContextPtr context)
|
||||
{
|
||||
Block block_to_insert = block;
|
||||
convertRightBlock(block_to_insert);
|
||||
TableLockHolder holder = tryLockForCurrentQueryTimedWithContext(rwlock, RWLockImpl::Write, context);
|
||||
|
||||
/// Protection from `INSERT INTO test_table_join SELECT * FROM test_table_join`
|
||||
if (!holder)
|
||||
throw Exception(ErrorCodes::DEADLOCK_AVOIDED, "StorageJoin: cannot insert data because current query tries to read from this storage");
|
||||
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
|
||||
join->addJoinedBlock(block_to_insert, true);
|
||||
}
|
||||
|
||||
|
@ -100,12 +100,15 @@ private:
|
||||
/// Protect state for concurrent use in insertFromBlock and joinBlock.
|
||||
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
|
||||
mutable RWLock rwlock = RWLockImpl::create();
|
||||
|
||||
mutable std::mutex mutate_mutex;
|
||||
|
||||
void insertBlock(const Block & block, ContextPtr context) override;
|
||||
void finishInsert() override {}
|
||||
size_t getSize(ContextPtr context) const override;
|
||||
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
|
||||
/// Same as tryLockTimedWithContext, but returns `nullptr` if lock is already acquired by current query.
|
||||
RWLockImpl::LockHolder tryLockForCurrentQueryTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
|
||||
|
||||
void convertRightBlock(Block & block) const;
|
||||
};
|
||||
|
@ -0,0 +1,16 @@
|
||||
DROP TABLE IF EXISTS test_table_join;
|
||||
|
||||
CREATE TABLE test_table_join
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE = Join(Any, Left, id);
|
||||
|
||||
INSERT INTO test_table_join VALUES (1, 'q');
|
||||
|
||||
INSERT INTO test_table_join SELECT * from test_table_join; -- { serverError DEADLOCK_AVOIDED }
|
||||
|
||||
INSERT INTO test_table_join SELECT * FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN test_table_join USING (id); -- { serverError DEADLOCK_AVOIDED }
|
||||
INSERT INTO test_table_join SELECT id, toString(id) FROM (SELECT 1 as id) AS t1 ANY LEFT JOIN (SELECT id FROM test_table_join) AS t2 USING (id); -- { serverError DEADLOCK_AVOIDED }
|
||||
|
||||
DROP TABLE IF EXISTS test_table_join;
|
Loading…
Reference in New Issue
Block a user