This commit is contained in:
Alexey Milovidov 2023-10-14 02:52:53 +02:00
parent 17616ca325
commit bbd67d262a
4 changed files with 108 additions and 9 deletions

View File

@ -156,12 +156,62 @@ StorageSet::StorageSet(
}
void StorageSet::insertBlock(const Block & block, ContextPtr) { set->insertFromBlock(block.getColumnsWithTypeAndName()); }
void StorageSet::finishInsert() { set->finishInsert(); }
SetPtr StorageSet::getSet() const
{
std::lock_guard lock(mutex);
return set;
}
size_t StorageSet::getSize(ContextPtr) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
void StorageSet::insertBlock(const Block & block, ContextPtr)
{
SetPtr current_set;
{
std::lock_guard lock(mutex);
current_set = set;
}
current_set->insertFromBlock(block.getColumnsWithTypeAndName());
}
void StorageSet::finishInsert()
{
SetPtr current_set;
{
std::lock_guard lock(mutex);
current_set = set;
}
current_set->finishInsert();
}
size_t StorageSet::getSize(ContextPtr) const
{
SetPtr current_set;
{
std::lock_guard lock(mutex);
current_set = set;
}
return current_set->getTotalRowCount();
}
std::optional<UInt64> StorageSet::totalRows(const Settings &) const
{
SetPtr current_set;
{
std::lock_guard lock(mutex);
current_set = set;
}
return current_set->getTotalRowCount();
}
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const
{
SetPtr current_set;
{
std::lock_guard lock(mutex);
current_set = set;
}
return current_set->getTotalRowCount();
}
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &)
{
@ -176,8 +226,13 @@ void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_sn
Block header = metadata_snapshot->getSampleBlock();
increment = 0;
set = std::make_shared<Set>(SizeLimits(), 0, true);
set->setHeader(header.getColumnsWithTypeAndName());
auto new_set = std::make_shared<Set>(SizeLimits(), 0, true);
new_set->setHeader(header.getColumnsWithTypeAndName());
{
std::lock_guard lock(mutex);
set = new_set;
}
}

View File

@ -79,7 +79,7 @@ public:
String getName() const override { return "Set"; }
/// Access the insides.
SetPtr & getSet() { return set; }
SetPtr getSet() const;
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, ContextPtr, TableExclusiveLockHolder &) override;
@ -87,7 +87,9 @@ public:
std::optional<UInt64> totalBytes(const Settings & settings) const override;
private:
SetPtr set;
/// Allows to concurrently truncate the set and work (read/fill) the existing set.
mutable std::mutex mutex;
SetPtr set TSA_GUARDED_BY(mutex);
void insertBlock(const Block & block, ContextPtr) override;
void finishInsert() override;

View File

@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Tags: race, no-debug
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -mn -q """
DROP TABLE IF EXISTS t1_02867;
CREATE TABLE t1_02867 (x UInt64) ENGINE=Set();
"""
function repeat_select() {
n=0
while [ "$n" -lt 20 ];
do
n=$(( n + 1 ))
$CLICKHOUSE_CLIENT -q "SELECT count() as a FROM numbers(10) WHERE number IN t1_02867" > /dev/null 2> /dev/null || exit
done
}
function repeat_truncate_insert() {
n=0
while [ "$n" -lt 20 ];
do
n=$(( n + 1 ))
$CLICKHOUSE_CLIENT -q "TRUNCATE t1_02867;" > /dev/null 2> /dev/null || exit
done
}
repeat_select &
repeat_truncate_insert &
repeat_select &
repeat_truncate_insert &
repeat_select &
repeat_truncate_insert &
repeat_select &
repeat_truncate_insert &
sleep 10
$CLICKHOUSE_CLIENT -mn -q "DROP TABLE IF EXISTS t1_02867;"