Merge pull request #29544 from Algunenano/join_deadlock

This commit is contained in:
Vladimir C 2021-10-12 12:34:04 +03:00 committed by GitHub
commit 969999ff10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 148 additions and 54 deletions

View File

@ -25,14 +25,14 @@ ColumnPtr ExecutableFunctionJoinGet<or_null>::executeImpl(const ColumnsWithTypeA
auto key = arguments[i];
keys.emplace_back(std::move(key));
}
return storage_join->joinGet(keys, result_columns).column;
return storage_join->joinGet(keys, result_columns, getContext()).column;
}
template <bool or_null>
ExecutableFunctionPtr FunctionJoinGet<or_null>::prepare(const ColumnsWithTypeAndName &) const
{
Block result_columns {{return_type->createColumn(), return_type, attr_name}};
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(table_lock, storage_join, result_columns);
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(getContext(), table_lock, storage_join, result_columns);
}
static std::pair<std::shared_ptr<StorageJoin>, String>
@ -89,7 +89,7 @@ FunctionBasePtr JoinGetOverloadResolver<or_null>::buildImpl(const ColumnsWithTyp
auto return_type = storage_join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto table_lock = storage_join->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, storage_join, attr_name, argument_types, return_type);
return std::make_unique<FunctionJoinGet<or_null>>(getContext(), table_lock, storage_join, attr_name, argument_types, return_type);
}
void registerFunctionJoinGet(FunctionFactory & factory)

View File

@ -14,13 +14,15 @@ class StorageJoin;
using StorageJoinPtr = std::shared_ptr<StorageJoin>;
template <bool or_null>
class ExecutableFunctionJoinGet final : public IExecutableFunction
class ExecutableFunctionJoinGet final : public IExecutableFunction, WithContext
{
public:
ExecutableFunctionJoinGet(TableLockHolder table_lock_,
ExecutableFunctionJoinGet(ContextPtr context_,
TableLockHolder table_lock_,
StorageJoinPtr storage_join_,
const DB::Block & result_columns_)
: table_lock(std::move(table_lock_))
: WithContext(context_)
, table_lock(std::move(table_lock_))
, storage_join(std::move(storage_join_))
, result_columns(result_columns_)
{}
@ -42,15 +44,17 @@ private:
};
template <bool or_null>
class FunctionJoinGet final : public IFunctionBase
class FunctionJoinGet final : public IFunctionBase, WithContext
{
public:
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
FunctionJoinGet(TableLockHolder table_lock_,
FunctionJoinGet(ContextPtr context_,
TableLockHolder table_lock_,
StorageJoinPtr storage_join_, String attr_name_,
DataTypes argument_types_, DataTypePtr return_type_)
: table_lock(std::move(table_lock_))
: WithContext(context_)
, table_lock(std::move(table_lock_))
, storage_join(storage_join_)
, attr_name(std::move(attr_name_))
, argument_types(std::move(argument_types_))

View File

@ -938,7 +938,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
if (auto storage = analyzed_join->getStorageJoin())
{
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, {});
return storage->getJoinLocked(analyzed_join);
return storage->getJoinLocked(analyzed_join, getContext());
}
joined_plan = buildJoinedPlan(getContext(), join_element, *analyzed_join, query_options);

View File

@ -744,7 +744,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
size_t total_rows = 0;
size_t total_bytes = 0;
{
if (storage_join_lock.mutex())
if (storage_join_lock)
throw DB::Exception("addJoinedBlock called when HashJoin locked to prevent updates",
ErrorCodes::LOGICAL_ERROR);

View File

@ -16,6 +16,7 @@
#include <Common/ColumnsHashing.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/FixedHashMap.h>
#include <Common/RWLock.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -334,9 +335,9 @@ public:
/// We keep correspondence between used_flags and hash table internal buffer.
/// Hash table cannot be modified during HashJoin lifetime and must be protected with lock.
void setLock(std::shared_mutex & rwlock)
void setLock(RWLockImpl::LockHolder rwlock_holder)
{
storage_join_lock = std::shared_lock<std::shared_mutex>(rwlock);
storage_join_lock = rwlock_holder;
}
void reuseJoinedData(const HashJoin & join);
@ -391,7 +392,7 @@ private:
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
std::shared_lock<std::shared_mutex> storage_join_lock;
RWLockImpl::LockHolder storage_join_lock = nullptr;
void dataMapInit(MapsVariant &);

View File

@ -226,6 +226,7 @@ private:
/// without locks.
MultiVersionStorageMetadataPtr metadata;
protected:
RWLockImpl::LockHolder tryLockTimed(
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & acquire_timeout) const;

View File

@ -1,13 +1,13 @@
#include <Storages/StorageJoin.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageSet.h>
#include <Storages/TableLockHolder.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Core/ColumnNumbers.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/IDisk.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/MutationsInterpreter.h>
#include <Interpreters/TableJoin.h>
@ -68,17 +68,24 @@ StorageJoin::StorageJoin(
restore();
}
RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
{
const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
const std::chrono::milliseconds acquire_timeout
= context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
return tryLockTimed(lock, type, query_id, acquire_timeout);
}
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
std::lock_guard mutate_lock(mutate_mutex);
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);
}
void StorageJoin::truncate(
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, TableExclusiveLockHolder &)
{
std::lock_guard mutate_lock(mutate_mutex);
std::unique_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
disk->removeRecursive(path);
disk->createDirectories(path);
@ -128,7 +135,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
}
/// Now acquire exclusive lock and modify storage.
std::unique_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
join = std::move(new_data);
increment = 1;
@ -152,7 +159,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
}
}
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
@ -171,34 +178,36 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join)
analyzed_join->setRightKeys(key_names);
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
join_clone->setLock(rwlock);
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
join_clone->setLock(holder);
join_clone->reuseJoinedData(*join);
return join_clone;
}
void StorageJoin::insertBlock(const Block & block)
void StorageJoin::insertBlock(const Block & block, ContextPtr context)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
join->addJoinedBlock(block, true);
}
size_t StorageJoin::getSize() const
size_t StorageJoin::getSize(ContextPtr context) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
return join->getTotalRowCount();
}
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const
std::optional<UInt64> StorageJoin::totalRows(const Settings &settings) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
return join->getTotalRowCount();
}
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const
std::optional<UInt64> StorageJoin::totalBytes(const Settings &settings) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
return join->getTotalByteCount();
}
@ -207,9 +216,9 @@ DataTypePtr StorageJoin::joinGetCheckAndGetReturnType(const DataTypes & data_typ
return join->joinGetCheckAndGetReturnType(data_types, column_name, or_null);
}
ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add, ContextPtr context) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
return join->joinGet(block, block_with_columns_to_add);
}
@ -370,10 +379,10 @@ size_t rawSize(const StringRef & t)
class JoinSource : public SourceWithProgress
{
public:
JoinSource(HashJoinPtr join_, std::shared_mutex & rwlock, UInt64 max_block_size_, Block sample_block_)
JoinSource(HashJoinPtr join_, TableLockHolder lock_holder_, UInt64 max_block_size_, Block sample_block_)
: SourceWithProgress(sample_block_)
, join(join_)
, lock(rwlock)
, lock_holder(lock_holder_)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
{
@ -421,7 +430,7 @@ protected:
private:
HashJoinPtr join;
std::shared_lock<std::shared_mutex> lock;
TableLockHolder lock_holder;
UInt64 max_block_size;
Block sample_block;
@ -571,7 +580,7 @@ Pipe StorageJoin::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned /*num_streams*/)
@ -579,7 +588,8 @@ Pipe StorageJoin::read(
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
Block source_sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
return Pipe(std::make_shared<JoinSource>(join, rwlock, max_block_size, source_sample_block));
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
return Pipe(std::make_shared<JoinSource>(join, std::move(holder), max_block_size, source_sample_block));
}
}

View File

@ -2,7 +2,9 @@
#include <base/shared_ptr_helper.h>
#include <Common/RWLock.h>
#include <Storages/StorageSet.h>
#include <Storages/TableLockHolder.h>
#include <Storages/JoinSettings.h>
#include <Parsers/ASTTablesInSelectQuery.h>
@ -35,7 +37,7 @@ public:
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const;
/// Get result type for function "joinGet(OrNull)"
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;
@ -43,7 +45,7 @@ public:
/// Execute function "joinGet(OrNull)" on data block.
/// Takes rwlock for read to prevent parallel StorageJoin updates during processing data block
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add, ContextPtr context) const;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
@ -73,12 +75,13 @@ private:
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
mutable std::shared_mutex rwlock;
mutable RWLock rwlock = RWLockImpl::create();
mutable std::mutex mutate_mutex;
void insertBlock(const Block & block) override;
void insertBlock(const Block & block, ContextPtr context) override;
void finishInsert() override {}
size_t getSize() const override;
size_t getSize(ContextPtr context) const override;
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
protected:
StorageJoin(

View File

@ -34,11 +34,11 @@ namespace ErrorCodes
}
class SetOrJoinSink : public SinkToStorage
class SetOrJoinSink : public SinkToStorage, WithContext
{
public:
SetOrJoinSink(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
ContextPtr ctx, StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_, bool persistent_);
@ -60,6 +60,7 @@ private:
SetOrJoinSink::SetOrJoinSink(
ContextPtr ctx,
StorageSetOrJoinBase & table_,
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
@ -67,6 +68,7 @@ SetOrJoinSink::SetOrJoinSink(
const String & backup_file_name_,
bool persistent_)
: SinkToStorage(metadata_snapshot_->getSampleBlock())
, WithContext(ctx)
, table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
@ -84,7 +86,7 @@ void SetOrJoinSink::consume(Chunk chunk)
/// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
Block sorted_block = getHeader().cloneWithColumns(chunk.detachColumns()).sortColumns();
table.insertBlock(sorted_block);
table.insertBlock(sorted_block, getContext());
if (persistent)
backup_stream.write(sorted_block);
}
@ -104,10 +106,11 @@ void SetOrJoinSink::onFinish()
}
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinSink>(*this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
return std::make_shared<SetOrJoinSink>(
context, *this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
}
@ -155,10 +158,10 @@ StorageSet::StorageSet(
}
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
void StorageSet::insertBlock(const Block & block, ContextPtr) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
void StorageSet::finishInsert() { set->finishInsert(); }
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
size_t StorageSet::getSize(ContextPtr) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
@ -210,6 +213,7 @@ void StorageSetOrJoinBase::restore()
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
{
ContextPtr ctx = nullptr;
auto backup_buf = disk->readFile(file_path);
CompressedReadBuffer compressed_backup_buf(*backup_buf);
NativeReader backup_stream(compressed_backup_buf, 0);
@ -218,14 +222,14 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
while (Block block = backup_stream.read())
{
info.update(block);
insertBlock(block);
insertBlock(block, ctx);
}
finishInsert();
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
file_path, info.rows, ReadableSize(info.bytes), getSize());
file_path, info.rows, ReadableSize(info.bytes), getSize(ctx));
}

View File

@ -51,10 +51,10 @@ private:
void restoreFromFile(const String & file_path);
/// Insert the block into the state.
virtual void insertBlock(const Block & block) = 0;
virtual void insertBlock(const Block & block, ContextPtr context) = 0;
/// Call after all blocks were inserted.
virtual void finishInsert() = 0;
virtual size_t getSize() const = 0;
virtual size_t getSize(ContextPtr context) const = 0;
};
@ -81,9 +81,9 @@ public:
private:
SetPtr set;
void insertBlock(const Block & block) override;
void insertBlock(const Block & block, ContextPtr) override;
void finishInsert() override;
size_t getSize() const override;
size_t getSize(ContextPtr) const override;
protected:
StorageSet(

View File

@ -0,0 +1,71 @@
#!/usr/bin/env bash
# Tags: long, deadlock
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
create_table () {
$CLICKHOUSE_CLIENT --query "
CREATE TABLE join_block_test
(
id String,
num Int64
)
ENGINE = Join(ANY, LEFT, id)
"
}
drop_table () {
# Force a sync drop to free the memory before ending the test
# Otherwise things get interesting if you run the test many times before the database is finally dropped
$CLICKHOUSE_CLIENT --query "
DROP TABLE join_block_test SYNC
"
}
populate_table_bg () {
(
$CLICKHOUSE_CLIENT --query "
INSERT INTO join_block_test
SELECT toString(number) as id, number * number as num
FROM system.numbers LIMIT 3000000
" >/dev/null
) &
}
read_table_bg () {
(
$CLICKHOUSE_CLIENT --query "
SELECT *
FROM
(
SELECT toString(number) AS user_id
FROM system.numbers LIMIT 10000 OFFSET 20000
) AS t1
LEFT JOIN
(
SELECT
*
FROM join_block_test AS i1
ANY LEFT JOIN
(
SELECT *
FROM join_block_test
) AS i2 ON i1.id = toString(i2.num)
) AS t2 ON t1.user_id = t2.id
" >/dev/null
) &
}
create_table
for _ in {1..5};
do
populate_table_bg
sleep 0.05
read_table_bg
sleep 0.05
done
wait
drop_table