mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
CacheDictionary datarace on exception_ptr (#9379)
* datarace on exceptionptr fixed * better * monkey test added * comment to test added * add new line to reference file * bump tests * q
This commit is contained in:
parent
8d51824ddc
commit
e06432af60
@ -7,21 +7,51 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ProfilingScopedWriteUnlocker;
|
||||
|
||||
class ProfilingScopedWriteRWLock
|
||||
{
|
||||
public:
|
||||
ProfilingScopedWriteRWLock(std::shared_mutex & rwl, ProfileEvents::Event event) :
|
||||
friend class ProfilingScopedWriteUnlocker;
|
||||
|
||||
ProfilingScopedWriteRWLock(std::shared_mutex & rwl_, ProfileEvents::Event event_) :
|
||||
watch(),
|
||||
scoped_write_lock(rwl)
|
||||
event(event_),
|
||||
scoped_write_lock(rwl_)
|
||||
{
|
||||
ProfileEvents::increment(event, watch.elapsed());
|
||||
}
|
||||
|
||||
private:
|
||||
Stopwatch watch;
|
||||
ProfileEvents::Event event;
|
||||
std::unique_lock<std::shared_mutex> scoped_write_lock;
|
||||
};
|
||||
|
||||
/// Inversed RAII
|
||||
/// Used to unlock current writelock for various purposes.
|
||||
class ProfilingScopedWriteUnlocker
|
||||
{
|
||||
public:
|
||||
ProfilingScopedWriteUnlocker() = delete;
|
||||
|
||||
ProfilingScopedWriteUnlocker(ProfilingScopedWriteRWLock & parent_lock_) : parent_lock(parent_lock_)
|
||||
{
|
||||
parent_lock.scoped_write_lock.unlock();
|
||||
}
|
||||
|
||||
~ProfilingScopedWriteUnlocker()
|
||||
{
|
||||
Stopwatch watch;
|
||||
parent_lock.scoped_write_lock.lock();
|
||||
ProfileEvents::increment(parent_lock.event, watch.elapsed());
|
||||
}
|
||||
|
||||
private:
|
||||
ProfilingScopedWriteRWLock & parent_lock;
|
||||
};
|
||||
|
||||
class ProfilingScopedReadRWLock
|
||||
{
|
||||
public:
|
||||
|
@ -820,6 +820,9 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
|
||||
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
|
||||
/// Non const because it will be unlocked.
|
||||
ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
|
||||
if (now > backoff_end_time.load())
|
||||
{
|
||||
try
|
||||
@ -832,13 +835,26 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
|
||||
}
|
||||
|
||||
Stopwatch watch;
|
||||
auto stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
|
||||
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
/// To perform parallel loading.
|
||||
BlockInputStreamPtr stream = nullptr;
|
||||
{
|
||||
ProfilingScopedWriteUnlocker unlocker(write_lock);
|
||||
stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
|
||||
}
|
||||
|
||||
stream->readPrefix();
|
||||
while (const auto block = stream->read())
|
||||
|
||||
while (true)
|
||||
{
|
||||
Block block;
|
||||
{
|
||||
ProfilingScopedWriteUnlocker unlocker(write_lock);
|
||||
block = stream->read();
|
||||
if (!block)
|
||||
break;
|
||||
}
|
||||
|
||||
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
|
||||
if (!id_column)
|
||||
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
|
||||
@ -907,8 +923,6 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
|
||||
|
||||
size_t not_found_num = 0, found_num = 0;
|
||||
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
|
||||
/// Check which ids have not been found and require setting null_value
|
||||
for (const auto & id_found_pair : remaining_ids)
|
||||
{
|
||||
|
@ -0,0 +1 @@
|
||||
OK
|
@ -0,0 +1,69 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# This is a monkey test used to trigger sanitizers.
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="CREATE DATABASE dictdb_01076; "
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="
|
||||
CREATE TABLE dictdb_01076.table_datarace
|
||||
(
|
||||
key_column UInt8,
|
||||
value Float64
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY key_column;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="
|
||||
INSERT INTO dictdb_01076.table_datarace VALUES (1, 1.1), (2, 2.2), (3, 3.3);
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="
|
||||
CREATE DICTIONARY IF NOT EXISTS dictdb_01076.dict_datarace
|
||||
(
|
||||
key_column UInt64,
|
||||
value Float64 DEFAULT 77.77
|
||||
)
|
||||
PRIMARY KEY key_column
|
||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_datarace' DB 'dictdb_01076'))
|
||||
LIFETIME(1)
|
||||
LAYOUT(CACHE());
|
||||
"
|
||||
|
||||
function thread1()
|
||||
{
|
||||
for attempt_thread1 in {1..50}
|
||||
do
|
||||
# This query will be ended with exception, because source dictionary has UInt8 as a key type.
|
||||
$CLICKHOUSE_CLIENT --query="SELECT dictGetFloat64('dictdb_01076.dict_datarace', 'value', toUInt64(1));"
|
||||
done
|
||||
}
|
||||
|
||||
|
||||
function thread2()
|
||||
{
|
||||
for attempt_thread2 in {1..50}
|
||||
do
|
||||
# This query will be ended with exception, because source dictionary has UInt8 as a key type.
|
||||
$CLICKHOUSE_CLIENT --query="SELECT dictGetFloat64('dictdb_01076.dict_datarace', 'value', toUInt64(2));"
|
||||
done
|
||||
}
|
||||
|
||||
export -f thread1;
|
||||
export -f thread2;
|
||||
|
||||
TIMEOUT=5
|
||||
|
||||
timeout $TIMEOUT bash -c thread1 > /dev/null 2>&1 &
|
||||
timeout $TIMEOUT bash -c thread2 > /dev/null 2>&1 &
|
||||
|
||||
wait
|
||||
|
||||
echo OK
|
||||
|
||||
$CLICKHOUSE_CLIENT --query="DROP TABLE dictdb_01076.table_datarace;"
|
||||
$CLICKHOUSE_CLIENT --query="DROP DICTIONARY dictdb_01076.dict_datarace;"
|
||||
$CLICKHOUSE_CLIENT --query="DROP DATABASE dictdb_01076;"
|
Loading…
Reference in New Issue
Block a user