Merge pull request #21009 from vdimir/fix-race-storage-join

This commit is contained in:
Vladimir 2021-02-26 14:26:10 +03:00 committed by GitHub
commit 78024ae2d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 179 additions and 96 deletions

View File

@ -25,16 +25,18 @@ ColumnPtr ExecutableFunctionJoinGet<or_null>::execute(const ColumnsWithTypeAndNa
auto key = arguments[i];
keys.emplace_back(std::move(key));
}
return join->joinGet(keys, result_columns).column;
return storage_join->joinGet(keys, result_columns).column;
}
template <bool or_null>
ExecutableFunctionImplPtr FunctionJoinGet<or_null>::prepare(const ColumnsWithTypeAndName &) const
{
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(join, DB::Block{{return_type->createColumn(), return_type, attr_name}});
Block result_columns {{return_type->createColumn(), return_type, attr_name}};
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(table_lock, storage_join, result_columns);
}
static auto getJoin(const ColumnsWithTypeAndName & arguments, const Context & context)
static std::pair<std::shared_ptr<StorageJoin>, String>
getJoin(const ColumnsWithTypeAndName & arguments, const Context & context)
{
String join_name;
if (const auto * name_col = checkAndGetColumnConst<ColumnString>(arguments[0].column.get()))
@ -87,13 +89,12 @@ FunctionBaseImplPtr JoinGetOverloadResolver<or_null>::build(const ColumnsWithTyp
+ ", should be greater or equal to 3",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto [storage_join, attr_name] = getJoin(arguments, context);
auto join = storage_join->getJoin();
DataTypes data_types(arguments.size() - 2);
for (size_t i = 2; i < arguments.size(); ++i)
data_types[i - 2] = arguments[i].type;
auto return_type = join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto return_type = storage_join->joinGetCheckAndGetReturnType(data_types, attr_name, or_null);
auto table_lock = storage_join->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, storage_join, join, attr_name, data_types, return_type);
return std::make_unique<FunctionJoinGet<or_null>>(table_lock, storage_join, attr_name, data_types, return_type);
}
void registerFunctionJoinGet(FunctionFactory & factory)

View File

@ -9,14 +9,20 @@ namespace DB
class Context;
class HashJoin;
using HashJoinPtr = std::shared_ptr<HashJoin>;
class StorageJoin;
using StorageJoinPtr = std::shared_ptr<StorageJoin>;
template <bool or_null>
class ExecutableFunctionJoinGet final : public IExecutableFunctionImpl
{
public:
ExecutableFunctionJoinGet(HashJoinPtr join_, const DB::Block & result_columns_)
: join(std::move(join_)), result_columns(result_columns_) {}
ExecutableFunctionJoinGet(TableLockHolder table_lock_,
StorageJoinPtr storage_join_,
const DB::Block & result_columns_)
: table_lock(std::move(table_lock_))
, storage_join(std::move(storage_join_))
, result_columns(result_columns_)
{}
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
@ -29,7 +35,8 @@ public:
String getName() const override { return name; }
private:
HashJoinPtr join;
TableLockHolder table_lock;
StorageJoinPtr storage_join;
DB::Block result_columns;
};
@ -39,12 +46,11 @@ class FunctionJoinGet final : public IFunctionBaseImpl
public:
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
FunctionJoinGet(TableLockHolder table_lock_, StoragePtr storage_join_,
HashJoinPtr join_, String attr_name_,
FunctionJoinGet(TableLockHolder table_lock_,
StorageJoinPtr storage_join_, String attr_name_,
DataTypes argument_types_, DataTypePtr return_type_)
: table_lock(std::move(table_lock_))
, storage_join(std::move(storage_join_))
, join(std::move(join_))
, storage_join(storage_join_)
, attr_name(std::move(attr_name_))
, argument_types(std::move(argument_types_))
, return_type(std::move(return_type_))
@ -60,8 +66,7 @@ public:
private:
TableLockHolder table_lock;
StoragePtr storage_join;
HashJoinPtr join;
StorageJoinPtr storage_join;
const String attr_name;
DataTypes argument_types;
DataTypePtr return_type;

View File

@ -739,7 +739,7 @@ static JoinPtr tryGetStorageJoin(std::shared_ptr<TableJoin> analyzed_join)
{
if (auto * table = analyzed_join->joined_storage.get())
if (auto * storage_join = dynamic_cast<StorageJoin *>(table))
return storage_join->getJoin(analyzed_join);
return storage_join->getJoinLocked(analyzed_join);
return {};
}

View File

@ -421,25 +421,12 @@ bool HashJoin::empty() const
return data->type == Type::EMPTY;
}
size_t HashJoin::getTotalByteCount() const
{
std::shared_lock lock(data->rwlock);
return getTotalByteCountLocked();
}
size_t HashJoin::getTotalRowCount() const
{
std::shared_lock lock(data->rwlock);
return getTotalRowCountLocked();
}
bool HashJoin::alwaysReturnsEmptySet() const
{
std::shared_lock lock(data->rwlock);
return isInnerOrRight(getKind()) && data->empty && !overDictionary();
}
size_t HashJoin::getTotalRowCountLocked() const
size_t HashJoin::getTotalRowCount() const
{
size_t res = 0;
@ -456,7 +443,7 @@ size_t HashJoin::getTotalRowCountLocked() const
return res;
}
size_t HashJoin::getTotalByteCountLocked() const
size_t HashJoin::getTotalByteCount() const
{
size_t res = 0;
@ -652,7 +639,9 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
size_t total_bytes = 0;
{
std::unique_lock lock(data->rwlock);
if (storage_join_lock.mutex())
throw DB::Exception("addJoinedBlock called when HashJoin locked to prevent updates",
ErrorCodes::LOGICAL_ERROR);
data->blocks.emplace_back(std::move(structured_block));
Block * stored_block = &data->blocks.back();
@ -677,8 +666,8 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
return true;
/// TODO: Do not calculate them every time
total_rows = getTotalRowCountLocked();
total_bytes = getTotalByteCountLocked();
total_rows = getTotalRowCount();
total_bytes = getTotalByteCount();
}
return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
@ -1216,11 +1205,8 @@ void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed)
block = block.cloneWithColumns(std::move(dst_columns));
}
DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const
{
std::shared_lock lock(data->rwlock);
size_t num_keys = data_types.size();
if (right_table_keys.columns() != num_keys)
throw Exception(
@ -1250,11 +1236,16 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
return elem.type;
}
template <typename Maps>
ColumnWithTypeAndName HashJoin::joinGetImpl(const Block & block, const Block & block_with_columns_to_add, const Maps & maps_) const
/// TODO: return multiple columns as named tuple
/// TODO: return array of values when strictness == ASTTableJoin::Strictness::All
ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
// Assemble the key block with correct names.
bool is_valid = (strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny)
&& kind == ASTTableJoin::Kind::Left;
if (!is_valid)
throw Exception("joinGet only supports StorageJoin of type Left Any", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
/// Assemble the key block with correct names.
Block keys;
for (size_t i = 0; i < block.columns(); ++i)
{
@ -1263,32 +1254,15 @@ ColumnWithTypeAndName HashJoin::joinGetImpl(const Block & block, const Block & b
keys.insert(std::move(key));
}
static_assert(!MapGetter<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>::flagged,
"joinGet are not protected from hash table changes between block processing");
joinBlockImpl<ASTTableJoin::Kind::Left, ASTTableJoin::Strictness::Any>(
keys, key_names_right, block_with_columns_to_add, maps_);
keys, key_names_right, block_with_columns_to_add, std::get<MapsOne>(data->maps));
return keys.getByPosition(keys.columns() - 1);
}
// TODO: return multiple columns as named tuple
// TODO: return array of values when strictness == ASTTableJoin::Strictness::All
ColumnWithTypeAndName HashJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
std::shared_lock lock(data->rwlock);
if ((strictness == ASTTableJoin::Strictness::Any || strictness == ASTTableJoin::Strictness::RightAny) &&
kind == ASTTableJoin::Kind::Left)
{
return joinGetImpl(block, block_with_columns_to_add, std::get<MapsOne>(data->maps));
}
else
throw Exception("joinGet only supports StorageJoin of type Left Any", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
}
void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
{
std::shared_lock lock(data->rwlock);
const Names & key_names_left = table_join->keyNamesLeft();
JoinCommon::checkTypesOfKeys(block, key_names_left, right_table_keys, key_names_right);

View File

@ -306,10 +306,6 @@ public:
struct RightTableData
{
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// @note that these methods could be called simultaneously only while use of StorageJoin.
mutable std::shared_mutex rwlock;
Type type = Type::EMPTY;
bool empty = true;
@ -322,6 +318,13 @@ public:
Arena pool;
};
/// We keep correspondence between used_flags and hash table internal buffer.
/// Hash table cannot be modified during HashJoin lifetime and must be protected with lock.
void setLock(std::shared_mutex & rwlock)
{
storage_join_lock = std::shared_lock<std::shared_mutex>(rwlock);
}
void reuseJoinedData(const HashJoin & join);
std::shared_ptr<RightTableData> getJoinedData() const
@ -353,6 +356,8 @@ private:
/// Flags that indicate that particular row already used in join.
/// Flag is stored for every record in hash map.
/// Number of this flags equals to hashtable buffer size (plus one for zero value).
/// Changes in hash table broke correspondence,
/// so we must guarantee constantness of hash table during HashJoin lifetime (using method setLock)
mutable JoinStuff::JoinUsedFlags used_flags;
Sizes key_sizes;
@ -371,6 +376,10 @@ private:
Block totals;
/// Should be set via setLock to protect hash table from modification from StorageJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
std::shared_lock<std::shared_mutex> storage_join_lock;
void init(Type type_);
const Block & savedBlockSample() const { return data->sample_block; }
@ -388,15 +397,8 @@ private:
void joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const;
template <typename Maps>
ColumnWithTypeAndName joinGetImpl(const Block & block, const Block & block_with_columns_to_add, const Maps & maps_) const;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
/// Call with already locked rwlock.
size_t getTotalRowCountLocked() const;
size_t getTotalByteCountLocked() const;
bool empty() const;
bool overDictionary() const;
};

View File

@ -79,7 +79,7 @@ void StorageJoin::truncate(
}
HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
@ -96,17 +96,47 @@ HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
analyzed_join->setRightKeys(key_names);
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
join_clone->setLock(rwlock);
join_clone->reuseJoinedData(*join);
return join_clone;
}
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block, true); }
void StorageJoin::insertBlock(const Block & block)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
join->addJoinedBlock(block, true);
}
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const { return join->getTotalByteCount(); }
size_t StorageJoin::getSize() const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
return join->getTotalRowCount();
}
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
return join->getTotalRowCount();
}
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
return join->getTotalByteCount();
}
DataTypePtr StorageJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const
{
return join->joinGetCheckAndGetReturnType(data_types, column_name, or_null);
}
ColumnWithTypeAndName StorageJoin::joinGet(const Block & block, const Block & block_with_columns_to_add) const
{
std::shared_lock<std::shared_mutex> lock(rwlock);
return join->joinGet(block, block_with_columns_to_add);
}
void registerStorageJoin(StorageFactory & factory)
{
@ -264,24 +294,24 @@ size_t rawSize(const StringRef & t)
class JoinSource : public SourceWithProgress
{
public:
JoinSource(const HashJoin & parent_, UInt64 max_block_size_, Block sample_block_)
JoinSource(HashJoinPtr join_, std::shared_mutex & rwlock, UInt64 max_block_size_, Block sample_block_)
: SourceWithProgress(sample_block_)
, parent(parent_)
, lock(parent.data->rwlock)
, join(join_)
, lock(rwlock)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
{
column_indices.resize(sample_block.columns());
auto & saved_block = parent.getJoinedData()->sample_block;
auto & saved_block = join->getJoinedData()->sample_block;
for (size_t i = 0; i < sample_block.columns(); ++i)
{
auto & [_, type, name] = sample_block.getByPosition(i);
if (parent.right_table_keys.has(name))
if (join->right_table_keys.has(name))
{
key_pos = i;
const auto & column = parent.right_table_keys.getByName(name);
const auto & column = join->right_table_keys.getByName(name);
restored_block.insert(column);
}
else
@ -300,19 +330,20 @@ public:
protected:
Chunk generate() override
{
if (parent.data->blocks.empty())
if (join->data->blocks.empty())
return {};
Chunk chunk;
if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
if (!joinDispatch(join->kind, join->strictness, join->data->maps,
[&](auto kind, auto strictness, auto & map) { chunk = createChunk<kind, strictness>(map); }))
throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
return chunk;
}
private:
const HashJoin & parent;
HashJoinPtr join;
std::shared_lock<std::shared_mutex> lock;
UInt64 max_block_size;
Block sample_block;
Block restored_block; /// sample_block with parent column types
@ -330,7 +361,7 @@ private:
size_t rows_added = 0;
switch (parent.data->type)
switch (join->data->type)
{
#define M(TYPE) \
case HashJoin::Type::TYPE: \
@ -340,7 +371,7 @@ private:
#undef M
default:
throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)),
throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(join->data->type)),
ErrorCodes::UNSUPPORTED_JOIN_KEYS);
}
@ -468,7 +499,8 @@ Pipe StorageJoin::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
return Pipe(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
Block source_sample_block = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
return Pipe(std::make_shared<JoinSource>(join, rwlock, max_block_size, source_sample_block));
}
}

View File

@ -14,7 +14,6 @@ class TableJoin;
class HashJoin;
using HashJoinPtr = std::shared_ptr<HashJoin>;
/** Allows you save the state for later use on the right side of the JOIN.
* When inserted into a table, the data will be inserted into the state,
* and also written to the backup file, to restore after the restart.
@ -30,12 +29,17 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
/// Access the innards.
HashJoinPtr & getJoin() { return join; }
HashJoinPtr getJoin(std::shared_ptr<TableJoin> analyzed_join) const;
/// Return instance of HashJoin holding lock that protects from insertions to StorageJoin.
/// HashJoin relies on structure of hash table that's why we need to return it with locked mutex.
HashJoinPtr getJoinLocked(std::shared_ptr<TableJoin> analyzed_join) const;
/// Verify that the data structure is suitable for implementing this type of JOIN.
void assertCompatible(ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_) const;
/// Get result type for function "joinGet(OrNull)"
DataTypePtr joinGetCheckAndGetReturnType(const DataTypes & data_types, const String & column_name, bool or_null) const;
/// Execute function "joinGet(OrNull)" on data block.
/// Takes rwlock for read to prevent parallel StorageJoin updates during processing data block
/// (but not during processing whole query, it's safe for joinGet that doesn't involve `used_flags` from HashJoin)
ColumnWithTypeAndName joinGet(const Block & block, const Block & block_with_columns_to_add) const;
Pipe read(
const Names & column_names,
@ -61,6 +65,10 @@ private:
std::shared_ptr<TableJoin> table_join;
HashJoinPtr join;
/// Protect state for concurrent use in insertFromBlock and joinBlock.
/// Lock is stored in HashJoin instance during query and blocks concurrent insertions.
mutable std::shared_mutex rwlock;
void insertBlock(const Block & block) override;
void finishInsert() override {}
size_t getSize() const override;

View File

@ -0,0 +1,61 @@
#!/usr/bin/env bash
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -o errexit
set -o pipefail
echo "
DROP TABLE IF EXISTS storage_join_race;
CREATE TABLE storage_join_race (x UInt64, y UInt64) Engine = Join(ALL, FULL, x);
" | $CLICKHOUSE_CLIENT -n
function read_thread_big()
{
while true; do
echo "
SELECT * FROM ( SELECT number AS x FROM numbers(100000) ) AS t1 ALL FULL JOIN storage_join_race USING (x) FORMAT Null;
" | $CLICKHOUSE_CLIENT -n
done
}
function read_thread_small()
{
while true; do
echo "
SELECT * FROM ( SELECT number AS x FROM numbers(10) ) AS t1 ALL FULL JOIN storage_join_race USING (x) FORMAT Null;
" | $CLICKHOUSE_CLIENT -n
done
}
function read_thread_select()
{
while true; do
echo "
SELECT * FROM storage_join_race FORMAT Null;
" | $CLICKHOUSE_CLIENT -n
done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f read_thread_big;
export -f read_thread_small;
export -f read_thread_select;
TIMEOUT=20
timeout $TIMEOUT bash -c read_thread_big 2> /dev/null &
timeout $TIMEOUT bash -c read_thread_small 2> /dev/null &
timeout $TIMEOUT bash -c read_thread_select 2> /dev/null &
echo "
INSERT INTO storage_join_race SELECT number AS x, number AS y FROM numbers (10000000);
" | $CLICKHOUSE_CLIENT -n
wait
$CLICKHOUSE_CLIENT -q "DROP TABLE storage_join_race"