Merge pull request #62305 from Avogar/fix-scalar-data-race

Fix data race on scalars in Context
This commit is contained in:
Kruglov Pavel 2024-04-05 15:24:26 +02:00 committed by GitHub
commit bdda4e31fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 121 additions and 9 deletions

View File

@ -83,7 +83,7 @@ public:
static ColumnWithTypeAndName createScalar(ContextPtr context_)
{
if (const auto * block = context_->tryGetSpecialScalar(Scalar::scalar_name))
if (auto block = context_->tryGetSpecialScalar(Scalar::scalar_name))
return block->getByPosition(0);
else if (context_->hasQueryContext())
{

View File

@ -1550,14 +1550,17 @@ ClassifierPtr Context::getWorkloadClassifier() const
}
const Scalars & Context::getScalars() const
Scalars Context::getScalars() const
{
std::lock_guard lock(mutex);
return scalars;
}
const Block & Context::getScalar(const String & name) const
Block Context::getScalar(const String & name) const
{
std::lock_guard lock(mutex);
auto it = scalars.find(name);
if (scalars.end() == it)
{
@ -1568,12 +1571,13 @@ const Block & Context::getScalar(const String & name) const
return it->second;
}
const Block * Context::tryGetSpecialScalar(const String & name) const
std::optional<Block> Context::tryGetSpecialScalar(const String & name) const
{
std::lock_guard lock(mutex);
auto it = special_scalars.find(name);
if (special_scalars.end() == it)
return nullptr;
return &it->second;
return std::nullopt;
return it->second;
}
Tables Context::getExternalTables() const
@ -1653,6 +1657,7 @@ void Context::addScalar(const String & name, const Block & block)
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars");
std::lock_guard lock(mutex);
scalars[name] = block;
}
@ -1662,6 +1667,7 @@ void Context::addSpecialScalar(const String & name, const Block & block)
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have local scalars");
std::lock_guard lock(mutex);
special_scalars[name] = block;
}
@ -1671,6 +1677,7 @@ bool Context::hasScalar(const String & name) const
if (isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot have scalars");
std::lock_guard lock(mutex);
return scalars.contains(name);
}

View File

@ -680,12 +680,12 @@ public:
std::shared_ptr<TemporaryTableHolder> findExternalTable(const String & table_name) const;
std::shared_ptr<TemporaryTableHolder> removeExternalTable(const String & table_name);
const Scalars & getScalars() const;
const Block & getScalar(const String & name) const;
Scalars getScalars() const;
Block getScalar(const String & name) const;
void addScalar(const String & name, const Block & block);
bool hasScalar(const String & name) const;
const Block * tryGetSpecialScalar(const String & name) const;
std::optional<Block> tryGetSpecialScalar(const String & name) const;
void addSpecialScalar(const String & name, const Block & block);
const QueryAccessInfo & getQueryAccessInfo() const { return *getQueryAccessInfoPtr(); }

View File

@ -0,0 +1,104 @@
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS test_tmp;
DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS view;
CREATE TABLE test
(
`address` FixedString(20),
`deployer` FixedString(20),
`block_number` UInt256,
`block_hash` FixedString(32),
`block_timestamp` DateTime('UTC'),
`insertion_time` DateTime('UTC')
)
ENGINE = MergeTree
ORDER BY address
SETTINGS index_granularity = 8192;
CREATE TABLE test_tmp as test;
CREATE TABLE dst
(
`block_timestamp` AggregateFunction(max, Nullable(DateTime('UTC'))),
`block_hash` AggregateFunction(argMax, Nullable(FixedString(32)), DateTime('UTC')),
`block_number` AggregateFunction(argMax, Nullable(UInt256), DateTime('UTC')),
`deployer` AggregateFunction(argMax, Nullable(FixedString(20)), DateTime('UTC')),
`address` FixedString(20),
`name` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`symbol` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`decimals` AggregateFunction(argMax, Nullable(UInt8), DateTime('UTC')),
`is_proxy` AggregateFunction(argMax, Nullable(Bool), DateTime('UTC')),
`blacklist_flags` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`whitelist_flags` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`detected_standards` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`amended_type` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`comment` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`_sources` AggregateFunction(groupUniqArray, String),
`_updated_at` AggregateFunction(max, DateTime('UTC')),
`_active` AggregateFunction(argMax, Bool, DateTime('UTC'))
)
ENGINE = MergeTree
ORDER BY address
SETTINGS index_granularity = 8192;
CREATE MATERIALIZED VIEW view TO dst
(
`block_timestamp` AggregateFunction(max, Nullable(DateTime('UTC'))),
`block_hash` AggregateFunction(argMax, Nullable(FixedString(32)), DateTime('UTC')),
`block_number` AggregateFunction(argMax, Nullable(UInt256), DateTime('UTC')),
`deployer` AggregateFunction(argMax, Nullable(FixedString(20)), DateTime('UTC')),
`address` FixedString(20),
`name` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`symbol` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`decimals` AggregateFunction(argMax, Nullable(UInt8), DateTime('UTC')),
`is_proxy` AggregateFunction(argMax, Nullable(Bool), DateTime('UTC')),
`blacklist_flags` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`whitelist_flags` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`detected_standards` AggregateFunction(argMax, Array(Nullable(String)), DateTime('UTC')),
`amended_type` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`comment` AggregateFunction(argMax, Nullable(String), DateTime('UTC')),
`_sources` AggregateFunction(groupUniqArray, String),
`_updated_at` AggregateFunction(max, DateTime('UTC')),
`_active` AggregateFunction(argMax, Bool, DateTime('UTC'))
) AS
(WITH (
SELECT toDateTime('1970-01-01 00:00:00')
) AS default_timestamp
SELECT
maxState(CAST(block_timestamp, 'Nullable(DateTime(\'UTC\'))')) AS block_timestamp,
argMaxState(CAST(block_hash, 'Nullable(FixedString(32))'), insertion_time) AS block_hash,
argMaxState(CAST(block_number, 'Nullable(UInt256)'), insertion_time) AS block_number,
argMaxState(CAST(deployer, 'Nullable(FixedString(20))'), insertion_time) AS deployer,
address,
argMaxState(CAST(NULL, 'Nullable(String)'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS name,
argMaxState(CAST(NULL, 'Nullable(String)'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS symbol,
argMaxState(CAST(NULL, 'Nullable(UInt8)'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS decimals,
argMaxState(CAST(true, 'Nullable(Boolean)'), insertion_time) AS is_proxy,
argMaxState(CAST('[]', 'Array(Nullable(String))'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS blacklist_flags,
argMaxState(CAST('[]', 'Array(Nullable(String))'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS whitelist_flags,
argMaxState(CAST('[]', 'Array(Nullable(String))'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS detected_standards,
argMaxState(CAST(NULL, 'Nullable(String)'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS amended_type,
argMaxState(CAST(NULL, 'Nullable(String)'), CAST(default_timestamp, 'DateTime(\'UTC\')')) AS comment,
groupUniqArrayState('tokens_proxy_deployments') AS _sources,
maxState(insertion_time) AS _updated_at,
argMaxState(true, CAST(default_timestamp, 'DateTime(\'UTC\')')) AS _active
FROM test
WHERE insertion_time > toDateTime('2024-03-14 11:38:09')
GROUP BY address);
set max_insert_threads=4;
insert into test_tmp select * from generateRandom() limit 24;
insert into test_tmp select * from generateRandom() limit 25;
insert into test_tmp select * from generateRandom() limit 26;
insert into test_tmp select * from generateRandom() limit 30;
INSERT INTO test(address, deployer, block_number, block_hash, block_timestamp, insertion_time) SELECT * FROM test_tmp;
select count() from test;
DROP TABLE test;
DROP TABLE test_tmp;
DROP TABLE dst;
DROP TABLE view;