Try to fix race in storage join: block parralel inserts

This commit is contained in:
vdimir 2021-02-20 18:00:59 +03:00
parent 9c48fcfdef
commit 6cc2fb5e9f
No known key found for this signature in database
GPG Key ID: F57B3E10A21DBB31
6 changed files with 41 additions and 30 deletions

View File

@ -25,7 +25,7 @@ ColumnPtr ExecutableFunctionJoinGet<or_null>::execute(const ColumnsWithTypeAndNa
auto key = arguments[i];
keys.emplace_back(std::move(key));
}
return join->joinGet(keys, result_columns).column;
return join->join->joinGet(keys, result_columns).column;
}
template <bool or_null>
@ -87,13 +87,13 @@ FunctionBaseImplPtr JoinGetOverloadResolver<or_null>::build(const ColumnsWithTyp
+ ", should be greater or equal to 3",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto [storage_join, attr_name] = getJoin(arguments, context);
auto join = storage_join->getJoin();
auto join_holder = storage_join->getJoin();
DataTypes data_types(arguments.size() - 2);
for (size_t i = 2; i < arguments.size(); ++i)
data_types[i - 2] = arguments[i].type;
auto return_type = join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto return_type = join_holder->join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto table_lock = storage_join->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, storage_join, join, attr_name, data_types, return_type);
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, join_holder, attr_name, data_types, return_type);
}
void registerFunctionJoinGet(FunctionFactory & factory)

View File

@ -9,13 +9,14 @@ namespace DB
class Context;
class HashJoin;
class HashJoinHolder;
using HashJoinPtr = std::shared_ptr<HashJoin>;
template <bool or_null>
class ExecutableFunctionJoinGet final : public IExecutableFunctionImpl
{
public:
ExecutableFunctionJoinGet(HashJoinPtr join_, const DB::Block & result_columns_)
ExecutableFunctionJoinGet(std::shared_ptr<HashJoinHolder> join_, const DB::Block & result_columns_)
: join(std::move(join_)), result_columns(result_columns_) {}
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
@ -29,7 +30,7 @@ public:
String getName() const override { return name; }
private:
HashJoinPtr join;
std::shared_ptr<HashJoinHolder> join;
DB::Block result_columns;
};
@ -39,12 +40,11 @@ class FunctionJoinGet final : public IFunctionBaseImpl
public:
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
FunctionJoinGet(TableLockHolder table_lock_, StoragePtr storage_join_,
HashJoinPtr join_, String attr_name_,
FunctionJoinGet(TableLockHolder table_lock_,
std::shared_ptr<HashJoinHolder> join_, String attr_name_,
DataTypes argument_types_, DataTypePtr return_type_)
: table_lock(std::move(table_lock_))
, storage_join(std::move(storage_join_))
, join(std::move(join_))
, join(join_)
, attr_name(std::move(attr_name_))
, argument_types(std::move(argument_types_))
, return_type(std::move(return_type_))
@ -60,8 +60,7 @@ public:
private:
TableLockHolder table_lock;
StoragePtr storage_join;
HashJoinPtr join;
std::shared_ptr<HashJoinHolder> join;
const String attr_name;
DataTypes argument_types;
DataTypePtr return_type;

View File

@ -423,19 +423,16 @@ bool HashJoin::empty() const
size_t HashJoin::getTotalByteCount() const
{
std::shared_lock lock(data->rwlock);
return getTotalByteCountLocked();
}
size_t HashJoin::getTotalRowCount() const
{
std::shared_lock lock(data->rwlock);
return getTotalRowCountLocked();
}
bool HashJoin::alwaysReturnsEmptySet() const
{
std::shared_lock lock(data->rwlock);
return isInnerOrRight(getKind()) && data->empty && !overDictionary();
}
@ -652,7 +649,7 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
size_t total_bytes = 0;
{
std::unique_lock lock(data->rwlock);
assert(storage_join_lock.mutex() == nullptr);
data->blocks.emplace_back(std::move(structured_block));
Block * stored_block = &data->blocks.back();
@ -1219,8 +1216,6 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const
{
std::shared_lock lock(data->rwlock);
size_t num_keys = data_types.size();
if (right_table_keys.columns() != num_keys)
throw Exception(
@ -1273,8 +1268,6 @@ ColumnWithTypeAndName HashJoin::joinGetImpl(const Block & block, const Block & b
// TODO: return array of values when strictness == ASTTableJoin::Strictness::All
ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
std::shared_lock lock(data->rwlock);
if ((strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny) &&
kind == ASTTableJoin::Kind::Left)
{
@ -1287,8 +1280,6 @@ ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
std::shared_lock lock(data->rwlock);
const Names & key_names_left = table_join->keyNamesLeft();
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);

View File

@ -308,7 +308,7 @@ public:
{
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// @note that these methods could be called simultaneously only while use of StorageJoin.
mutable std::shared_mutex rwlock;
// mutable std::shared_mutex rwlock;
Type type = Type::EMPTY;
bool empty = true;
@ -322,6 +322,11 @@ public:
Arena pool;
};
void setLock(std::shared_mutex & rwlock)
{
storage_join_lock = std::shared_lock<std::shared_mutex>(rwlock);
}
void reuseJoinedData(const HashJoin & join);
std::shared_ptr<RightTableData> getJoinedData() const
@ -371,6 +376,8 @@ private:
Block totals;
std::shared_lock<std::shared_mutex> storage_join_lock;
void init(Type type_);
const Block & savedBlockSample() const { return data->sample_block; }

View File

@ -97,11 +97,17 @@ HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
join_clone->reuseJoinedData(*join);
join_clone->setLock(rwlock);
return join_clone;
}
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block, true); }
void StorageJoin::insertBlock(const Block & block)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
join->addJoinedBlock(block, true);
}
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const { return join->getTotalRowCount(); }
@ -267,7 +273,6 @@ public:
JoinSource(const HashJoin & parent_, UInt64 max_block_size_, Block sample_block_)
: SourceWithProgress(sample_block_)
, parent(parent_)
, lock(parent.data->rwlock)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
{
@ -312,7 +317,6 @@ protected:
private:
const HashJoin & parent;
std::shared_lock<std::shared_mutex> lock;
UInt64 max_block_size;
Block sample_block;
Block restored_block; /// sample_block with parent column types

View File

@ -14,6 +14,18 @@ class TableJoin;
class HashJoin;
using HashJoinPtr = std::shared_ptr<HashJoin>;
class HashJoinHolder
{
std::shared_lock<std::shared_mutex> lock;
public:
HashJoinPtr join;
HashJoinHolder(std::shared_mutex & rwlock, HashJoinPtr join_)
: lock(rwlock)
, join(join_)
{
}
};
/** Allows you save the state for later use on the right side of the JOIN.
* When inserted into a table, the data will be inserted into the state,
@ -31,12 +43,9 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
/// Access the innards.
HashJoinPtr & getJoin() { return join; }
std::shared_ptr<HashJoinHolder> getJoin() { return std::make_shared<HashJoinHolder>(rwlock, join); }
HashJoinPtr getJoin(std::shared_ptr<TableJoin> analyzed_join) const;
/// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
@ -60,6 +69,7 @@ private:
std::shared_ptr<TableJoin> table_join;
HashJoinPtr join;
mutable std::shared_mutex rwlock;
void insertBlock(const Block & block) override;
void finishInsert() override {}