mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Merge pull request #29544 from Algunenano/join_deadlock
This commit is contained in:
commit
969999ff10
@ -25,14 +25,14 @@ ColumnPtr ExecutableFunctionJoinGet<or_null>::executeImpl(const ColumnsWithTypeA
|
||||
auto key = arguments[i];
|
||||
keys.emplace_back(std::move(key));
|
||||
}
|
||||
return storage_join->joinGet(keys, result_columns).column;
|
||||
return storage_join->joinGet(keys, result_columns, getContext()).column;
|
||||
}
|
||||
|
||||
template <bool or_null>
|
||||
ExecutableFunctionPtr FunctionJoinGet<or_null>::prepare(const ColumnsWithTypeAndName &) const
|
||||
{
|
||||
Block result_columns {{return_type->createColumn(), return_type, attr_name}};
|
||||
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(table_lock, storage_join, result_columns);
|
||||
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(getContext(), table_lock, storage_join, result_columns);
|
||||
}
|
||||
|
||||
static std::pair<std::shared_ptr<StorageJoin>, String>
|
||||
@ -89,7 +89,7 @@ FunctionBasePtr JoinGetOverloadResolver<or_null>::buildImpl(const ColumnsWithTyp
|
||||
auto return_type = storage_join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
|
||||
auto table_lock = storage_join->lockForShare(getContext()->getInitialQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
|
||||
|
||||
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, storage_join, attr_name, argument_types, return_type);
|
||||
return std::make_unique<FunctionJoinGet<or_null>>(getContext(), table_lock, storage_join, attr_name, argument_types, return_type);
|
||||
}
|
||||
|
||||
void registerFunctionJoinGet(FunctionFactory & factory)
|
||||
|
@ -14,13 +14,15 @@ class StorageJoin;
|
||||
using StorageJoinPtr = std::shared_ptr<StorageJoin>;
|
||||
|
||||
template <bool or_null>
|
||||
class ExecutableFunctionJoinGet final : public IExecutableFunction
|
||||
class ExecutableFunctionJoinGet final : public IExecutableFunction, WithContext
|
||||
{
|
||||
public:
|
||||
ExecutableFunctionJoinGet(TableLockHolder table_lock_,
|
||||
ExecutableFunctionJoinGet(ContextPtr context_,
|
||||
TableLockHolder table_lock_,
|
||||
StorageJoinPtr storage_join_,
|
||||
const DB::Block & result_columns_)
|
||||
: table_lock(std::move(table_lock_))
|
||||
: WithContext(context_)
|
||||
, table_lock(std::move(table_lock_))
|
||||
, storage_join(std::move(storage_join_))
|
||||
, result_columns(result_columns_)
|
||||
{}
|
||||
@ -42,15 +44,17 @@ private:
|
||||
};
|
||||
|
||||
template <bool or_null>
|
||||
class FunctionJoinGet final : public IFunctionBase
|
||||
class FunctionJoinGet final : public IFunctionBase, WithContext
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
|
||||
|
||||
FunctionJoinGet(TableLockHolder table_lock_,
|
||||
FunctionJoinGet(ContextPtr context_,
|
||||
TableLockHolder table_lock_,
|
||||
StorageJoinPtr storage_join_, String attr_name_,
|
||||
DataTypes argument_types_, DataTypePtr return_type_)
|
||||
: table_lock(std::move(table_lock_))
|
||||
: WithContext(context_)
|
||||
, table_lock(std::move(table_lock_))
|
||||
, storage_join(storage_join_)
|
||||
, attr_name(std::move(attr_name_))
|
||||
, argument_types(std::move(argument_types_))
|
||||
|
@ -938,7 +938,7 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
|
||||
if (auto storage = analyzed_join->getStorageJoin())
|
||||
{
|
||||
std::tie(left_convert_actions, right_convert_actions) = analyzed_join->createConvertingActions(left_columns, {});
|
||||
return storage->getJoinLocked(analyzed_join);
|
||||
return storage->getJoinLocked(analyzed_join, getContext());
|
||||
}
|
||||
|
||||
joined_plan = buildJoinedPlan(getContext(), join_element, *analyzed_join, query_options);
|
||||
|
@ -744,7 +744,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
|
||||
size_t total_rows = 0;
|
||||
size_t total_bytes = 0;
|
||||
{
|
||||
if (storage_join_lock.mutex())
|
||||
if (storage_join_lock)
|
||||
throw DB::Exception("addJoinedBlock called when HashJoin locked to prevent updates",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <Common/ColumnsHashing.h>
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/HashTable/FixedHashMap.h>
|
||||
#include <Common/RWLock.h>
|
||||
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnFixedString.h>
|
||||
@ -334,9 +335,9 @@ public:
|
||||
|
||||
/// We keep correspondence between used_flags and hash table internal buffer.
|
||||
/// Hash table cannot be modified during HashJoin lifetime and must be protected with lock.
|
||||
void setLock(std::shared_mutex & rwlock)
|
||||
void setLock(RWLockImpl::LockHolder rwlock_holder)
|
||||
{
|
||||
storage_join_lock = std::shared_lock<std::shared_mutex>(rwlock);
|
||||
storage_join_lock = rwlock_holder;
|
||||
}
|
||||
|
||||
void reuseJoinedData(const HashJoin & join);
|
||||
@ -391,7 +392,7 @@ private:
|
||||
|
||||
/// Should be set via setLock to protect hash table from modification from StorageJoin
|
||||
/// If set HashJoin instance is not available for modification (addJoinedBlock)
|
||||
std::shared_lock<std::shared_mutex> storage_join_lock;
|
||||
RWLockImpl::LockHolder storage_join_lock = nullptr;
|
||||
|
||||
void dataMapInit(MapsVariant &);
|
||||
|
||||
|
@ -226,6 +226,7 @@ private:
|
||||
/// without locks.
|
||||
MultiVersionStorageMetadataPtr metadata;
|
||||
|
||||
protected:
|
||||
RWLockImpl::LockHolder tryLockTimed(
|
||||
const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & acquire_timeout) const;
|
||||
|
||||
|
@ -1,13 +1,13 @@
|
||||
#include <Storages/StorageJoin.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageSet.h>
|
||||
#include <Storages/TableLockHolder.h>
|
||||
#include <Interpreters/HashJoin.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Interpreters/joinDispatch.h>
|
||||
#include <Interpreters/MutationsInterpreter.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
@ -68,17 +68,24 @@ StorageJoin::StorageJoin(
|
||||
restore();
|
||||
}
|
||||
|
||||
RWLockImpl::LockHolder StorageJoin::tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const
|
||||
{
|
||||
const String query_id = context ? context->getInitialQueryId() : RWLockImpl::NO_QUERY;
|
||||
const std::chrono::milliseconds acquire_timeout
|
||||
= context ? context->getSettingsRef().lock_acquire_timeout : std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC);
|
||||
return tryLockTimed(lock, type, query_id, acquire_timeout);
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageJoin::write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
{
|
||||
std::lock_guard mutate_lock(mutate_mutex);
|
||||
return StorageSetOrJoinBase::write(query, metadata_snapshot, context);
|
||||
}
|
||||
|
||||
void StorageJoin::truncate(
|
||||
const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder&)
|
||||
void StorageJoin::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, TableExclusiveLockHolder &)
|
||||
{
|
||||
std::lock_guard mutate_lock(mutate_mutex);
|
||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
|
||||
|
||||
disk->removeRecursive(path);
|
||||
disk->createDirectories(path);
|
||||
@ -128,7 +135,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
|
||||
}
|
||||
|
||||
/// Now acquire exclusive lock and modify storage.
|
||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
|
||||
|
||||
join = std::move(new_data);
|
||||
increment = 1;
|
||||
@ -152,7 +159,7 @@ void StorageJoin::mutate(const MutationCommands & commands, ContextPtr context)
|
||||
}
|
||||
}
|
||||
|
||||
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
|
||||
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
|
||||
@ -171,34 +178,36 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join)
|
||||
analyzed_join->setRightKeys(key_names);
|
||||
|
||||
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
|
||||
join_clone->setLock(rwlock);
|
||||
|
||||
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
|
||||
join_clone->setLock(holder);
|
||||
join_clone->reuseJoinedData(*join);
|
||||
|
||||
return join_clone;
|
||||
}
|
||||
|
||||
|
||||
void StorageJoin::insertBlock(const Block & block)
|
||||
void StorageJoin::insertBlock(const Block & block, ContextPtr context)
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Write, context);
|
||||
join->addJoinedBlock(block, true);
|
||||
}
|
||||
|
||||
size_t StorageJoin::getSize() const
|
||||
size_t StorageJoin::getSize(ContextPtr context) const
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
|
||||
return join->getTotalRowCount();
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const
|
||||
std::optional<UInt64> StorageJoin::totalRows(const Settings &settings) const
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
|
||||
return join->getTotalRowCount();
|
||||
}
|
||||
|
||||
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const
|
||||
std::optional<UInt64> StorageJoin::totalBytes(const Settings &settings) const
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimed(rwlock, RWLockImpl::Read, RWLockImpl::NO_QUERY, settings.lock_acquire_timeout);
|
||||
return join->getTotalByteCount();
|
||||
}
|
||||
|
||||
@ -207,9 +216,9 @@ DataTypePtr StorageJoin::joinGetCheckAndGetReturnType(const DataTypes & data_typ
|
||||
return join->joinGetCheckAndGetReturnType(data_types, column_name, or_null);
|
||||
}
|
||||
|
||||
ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
|
||||
ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add, ContextPtr context) const
|
||||
{
|
||||
std::shared_lock<std::shared_mutex> lock(rwlock);
|
||||
TableLockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
|
||||
return join->joinGet(block, block_with_columns_to_add);
|
||||
}
|
||||
|
||||
@ -370,10 +379,10 @@ size_t rawSize(const StringRef & t)
|
||||
class JoinSource : public SourceWithProgress
|
||||
{
|
||||
public:
|
||||
JoinSource(HashJoinPtr join_, std::shared_mutex & rwlock, UInt64 max_block_size_, Block sample_block_)
|
||||
JoinSource(HashJoinPtr join_, TableLockHolder lock_holder_, UInt64 max_block_size_, Block sample_block_)
|
||||
: SourceWithProgress(sample_block_)
|
||||
, join(join_)
|
||||
, lock(rwlock)
|
||||
, lock_holder(lock_holder_)
|
||||
, max_block_size(max_block_size_)
|
||||
, sample_block(std::move(sample_block_))
|
||||
{
|
||||
@ -421,7 +430,7 @@ protected:
|
||||
|
||||
private:
|
||||
HashJoinPtr join;
|
||||
std::shared_lock<std::shared_mutex> lock;
|
||||
TableLockHolder lock_holder;
|
||||
|
||||
UInt64 max_block_size;
|
||||
Block sample_block;
|
||||
@ -571,7 +580,7 @@ Pipe StorageJoin::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
SelectQueryInfo & /*query_info*/,
|
||||
ContextPtr /*context*/,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned /*num_streams*/)
|
||||
@ -579,7 +588,8 @@ Pipe StorageJoin::read(
|
||||
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
|
||||
|
||||
Block source_sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
|
||||
return Pipe(std::make_shared<JoinSource>(join, rwlock, max_block_size, source_sample_block));
|
||||
RWLockImpl::LockHolder holder = tryLockTimedWithContext(rwlock, RWLockImpl::Read, context);
|
||||
return Pipe(std::make_shared<JoinSource>(join, std::move(holder), max_block_size, source_sample_block));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,7 +2,9 @@
|
||||
|
||||
#include <base/shared_ptr_helper.h>
|
||||
|
||||
#include <Common/RWLock.h>
|
||||
#include <Storages/StorageSet.h>
|
||||
#include <Storages/TableLockHolder.h>
|
||||
#include <Storages/JoinSettings.h>
|
||||
#include <Parsers/ASTTablesInSelectQuery.h>
|
||||
|
||||
@ -35,7 +37,7 @@ public:
|
||||
|
||||
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
|
||||
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
|
||||
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
|
||||
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join, ContextPtr context) const;
|
||||
|
||||
/// Get result type for function "joinGet(OrNull)"
|
||||
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;
|
||||
@ -43,7 +45,7 @@ public:
|
||||
/// Execute function "joinGet(OrNull)" on data block.
|
||||
/// Takes rwlock for read to prevent parallel StorageJoin updates during processing data block
|
||||
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
|
||||
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
|
||||
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add, ContextPtr context) const;
|
||||
|
||||
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override;
|
||||
|
||||
@ -73,12 +75,13 @@ private:
|
||||
|
||||
/// Protect state for concurrent use in insertFromBlock and joinBlock.
|
||||
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
|
||||
mutable std::shared_mutex rwlock;
|
||||
mutable RWLock rwlock = RWLockImpl::create();
|
||||
mutable std::mutex mutate_mutex;
|
||||
|
||||
void insertBlock(const Block & block) override;
|
||||
void insertBlock(const Block & block, ContextPtr context) override;
|
||||
void finishInsert() override {}
|
||||
size_t getSize() const override;
|
||||
size_t getSize(ContextPtr context) const override;
|
||||
RWLockImpl::LockHolder tryLockTimedWithContext(const RWLock & lock, RWLockImpl::Type type, ContextPtr context) const;
|
||||
|
||||
protected:
|
||||
StorageJoin(
|
||||
|
@ -34,11 +34,11 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
class SetOrJoinSink : public SinkToStorage
|
||||
class SetOrJoinSink : public SinkToStorage, WithContext
|
||||
{
|
||||
public:
|
||||
SetOrJoinSink(
|
||||
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
|
||||
ContextPtr ctx, StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
|
||||
const String & backup_path_, const String & backup_tmp_path_,
|
||||
const String & backup_file_name_, bool persistent_);
|
||||
|
||||
@ -60,6 +60,7 @@ private:
|
||||
|
||||
|
||||
SetOrJoinSink::SetOrJoinSink(
|
||||
ContextPtr ctx,
|
||||
StorageSetOrJoinBase & table_,
|
||||
const StorageMetadataPtr & metadata_snapshot_,
|
||||
const String & backup_path_,
|
||||
@ -67,6 +68,7 @@ SetOrJoinSink::SetOrJoinSink(
|
||||
const String & backup_file_name_,
|
||||
bool persistent_)
|
||||
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
||||
, WithContext(ctx)
|
||||
, table(table_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, backup_path(backup_path_)
|
||||
@ -84,7 +86,7 @@ void SetOrJoinSink::consume(Chunk chunk)
|
||||
/// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
|
||||
Block sorted_block = getHeader().cloneWithColumns(chunk.detachColumns()).sortColumns();
|
||||
|
||||
table.insertBlock(sorted_block);
|
||||
table.insertBlock(sorted_block, getContext());
|
||||
if (persistent)
|
||||
backup_stream.write(sorted_block);
|
||||
}
|
||||
@ -104,10 +106,11 @@ void SetOrJoinSink::onFinish()
|
||||
}
|
||||
|
||||
|
||||
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
|
||||
SinkToStoragePtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
|
||||
{
|
||||
UInt64 id = ++increment;
|
||||
return std::make_shared<SetOrJoinSink>(*this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
|
||||
return std::make_shared<SetOrJoinSink>(
|
||||
context, *this, metadata_snapshot, path, fs::path(path) / "tmp/", toString(id) + ".bin", persistent);
|
||||
}
|
||||
|
||||
|
||||
@ -155,10 +158,10 @@ StorageSet::StorageSet(
|
||||
}
|
||||
|
||||
|
||||
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
|
||||
void StorageSet::insertBlock(const Block & block, ContextPtr) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
|
||||
void StorageSet::finishInsert() { set->finishInsert(); }
|
||||
|
||||
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
|
||||
size_t StorageSet::getSize(ContextPtr) const { return set->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
|
||||
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
|
||||
|
||||
@ -210,6 +213,7 @@ void StorageSetOrJoinBase::restore()
|
||||
|
||||
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
|
||||
{
|
||||
ContextPtr ctx = nullptr;
|
||||
auto backup_buf = disk->readFile(file_path);
|
||||
CompressedReadBuffer compressed_backup_buf(*backup_buf);
|
||||
NativeReader backup_stream(compressed_backup_buf, 0);
|
||||
@ -218,14 +222,14 @@ void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
|
||||
while (Block block = backup_stream.read())
|
||||
{
|
||||
info.update(block);
|
||||
insertBlock(block);
|
||||
insertBlock(block, ctx);
|
||||
}
|
||||
|
||||
finishInsert();
|
||||
|
||||
/// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
|
||||
LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
|
||||
file_path, info.rows, ReadableSize(info.bytes), getSize());
|
||||
file_path, info.rows, ReadableSize(info.bytes), getSize(ctx));
|
||||
}
|
||||
|
||||
|
||||
|
@ -51,10 +51,10 @@ private:
|
||||
void restoreFromFile(const String & file_path);
|
||||
|
||||
/// Insert the block into the state.
|
||||
virtual void insertBlock(const Block & block) = 0;
|
||||
virtual void insertBlock(const Block & block, ContextPtr context) = 0;
|
||||
/// Call after all blocks were inserted.
|
||||
virtual void finishInsert() = 0;
|
||||
virtual size_t getSize() const = 0;
|
||||
virtual size_t getSize(ContextPtr context) const = 0;
|
||||
};
|
||||
|
||||
|
||||
@ -81,9 +81,9 @@ public:
|
||||
private:
|
||||
SetPtr set;
|
||||
|
||||
void insertBlock(const Block & block) override;
|
||||
void insertBlock(const Block & block, ContextPtr) override;
|
||||
void finishInsert() override;
|
||||
size_t getSize() const override;
|
||||
size_t getSize(ContextPtr) const override;
|
||||
|
||||
protected:
|
||||
StorageSet(
|
||||
|
71
tests/queries/0_stateless/02033_join_engine_deadlock_long.sh
Executable file
71
tests/queries/0_stateless/02033_join_engine_deadlock_long.sh
Executable file
@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, deadlock
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
create_table () {
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
CREATE TABLE join_block_test
|
||||
(
|
||||
id String,
|
||||
num Int64
|
||||
)
|
||||
ENGINE = Join(ANY, LEFT, id)
|
||||
"
|
||||
}
|
||||
|
||||
drop_table () {
|
||||
# Force a sync drop to free the memory before ending the test
|
||||
# Otherwise things get interesting if you run the test many times before the database is finally dropped
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
DROP TABLE join_block_test SYNC
|
||||
"
|
||||
}
|
||||
|
||||
populate_table_bg () {
|
||||
(
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
INSERT INTO join_block_test
|
||||
SELECT toString(number) as id, number * number as num
|
||||
FROM system.numbers LIMIT 3000000
|
||||
" >/dev/null
|
||||
) &
|
||||
}
|
||||
|
||||
read_table_bg () {
|
||||
(
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SELECT *
|
||||
FROM
|
||||
(
|
||||
SELECT toString(number) AS user_id
|
||||
FROM system.numbers LIMIT 10000 OFFSET 20000
|
||||
) AS t1
|
||||
LEFT JOIN
|
||||
(
|
||||
SELECT
|
||||
*
|
||||
FROM join_block_test AS i1
|
||||
ANY LEFT JOIN
|
||||
(
|
||||
SELECT *
|
||||
FROM join_block_test
|
||||
) AS i2 ON i1.id = toString(i2.num)
|
||||
) AS t2 ON t1.user_id = t2.id
|
||||
" >/dev/null
|
||||
) &
|
||||
}
|
||||
|
||||
create_table
|
||||
for _ in {1..5};
|
||||
do
|
||||
populate_table_bg
|
||||
sleep 0.05
|
||||
read_table_bg
|
||||
sleep 0.05
|
||||
done
|
||||
|
||||
wait
|
||||
drop_table
|
Loading…
Reference in New Issue
Block a user