Merge pull request #36744 from kitaisreal/clickhouse-dictionary-context-copy

ClickHouseDictionarySource context copy
This commit is contained in:
Maksim Kita 2022-04-29 11:07:45 +02:00 committed by GitHub
commit 0b3f27b991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 46 deletions

View File

@ -169,12 +169,12 @@ Pipe ClickHouseDictionarySource::createStreamForQuery(const String & query, std:
/// Sample block should not contain first row default values /// Sample block should not contain first row default values
auto empty_sample_block = sample_block.cloneEmpty(); auto empty_sample_block = sample_block.cloneEmpty();
/// Copy context because results of scalar subqueries potentially could be cached
auto context_copy = Context::createCopy(context); auto context_copy = Context::createCopy(context);
context_copy->makeQueryContext(); context_copy->makeQueryContext();
if (configuration.is_local) if (configuration.is_local)
{ {
builder.init(executeQuery(query, context_copy, true).pipeline); builder.init(executeQuery(query, context_copy, true).pipeline);
auto converting = ActionsDAG::makeConvertingActions( auto converting = ActionsDAG::makeConvertingActions(
builder.getHeader().getColumnsWithTypeAndName(), builder.getHeader().getColumnsWithTypeAndName(),
@ -206,17 +206,21 @@ Pipe ClickHouseDictionarySource::createStreamForQuery(const String & query, std:
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const
{ {
LOG_TRACE(log, "Performing invalidate query"); LOG_TRACE(log, "Performing invalidate query");
/// Copy context because results of scalar subqueries potentially could be cached
auto context_copy = Context::createCopy(context);
context_copy->makeQueryContext();
if (configuration.is_local) if (configuration.is_local)
{ {
auto query_context = Context::createCopy(context); return readInvalidateQuery(executeQuery(request, context_copy, true).pipeline);
return readInvalidateQuery(executeQuery(request, query_context, true).pipeline);
} }
else else
{ {
/// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result. /// We pass empty block to RemoteBlockInputStream, because we don't know the structure of the result.
Block invalidate_sample_block; Block invalidate_sample_block;
QueryPipeline pipeline(std::make_shared<RemoteSource>( QueryPipeline pipeline(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, request, invalidate_sample_block, context), false, false)); std::make_shared<RemoteQueryExecutor>(pool, request, invalidate_sample_block, context_copy), false, false));
return readInvalidateQuery(std::move(pipeline)); return readInvalidateQuery(std::move(pipeline));
} }
} }

View File

@ -1,7 +1,3 @@
1 1 1 1
2 2 2 2
3 3 3 3
3 3
0
0
4 4

View File

@ -52,42 +52,4 @@ SELECT * FROM 01780_db.dict3;
DROP DICTIONARY 01780_db.dict3; DROP DICTIONARY 01780_db.dict3;
DROP TABLE IF EXISTS 01780_db.dict4_source;
CREATE TABLE 01780_db.dict4_source
(
id UInt64,
value String
) ENGINE = TinyLog;
DROP TABLE IF EXISTS 01780_db.dict4_view;
CREATE VIEW 01780_db.dict4_view
(
id UInt64,
value String
) AS SELECT id, value FROM 01780_db.dict4_source WHERE id = (SELECT max(id) FROM 01780_db.dict4_source);
INSERT INTO 01780_db.dict4_source VALUES (1, '1'), (2, '2'), (3, '3');
DROP DICTIONARY IF EXISTS 01780_db.dict4;
CREATE DICTIONARY 01780_db.dict4
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 TABLE 'dict4_view' DATABASE '01780_db'))
LIFETIME(MIN 0 MAX 1)
LAYOUT(COMPLEX_KEY_HASHED());
SELECT * FROM 01780_db.dict4;
INSERT INTO 01780_db.dict4_source VALUES (4, '4');
SELECT sleep(3);
SELECT sleep(3);
SELECT * FROM 01780_db.dict4;
DROP DICTIONARY 01780_db.dict4;
DROP DATABASE 01780_db; DROP DATABASE 01780_db;

View File

@ -0,0 +1,2 @@
3 3
4 4

View File

@ -0,0 +1,37 @@
DROP TABLE IF EXISTS test_dictionary_source_table;
CREATE TABLE test_dictionary_source_table
(
id UInt64,
value String
) ENGINE = TinyLog;
DROP TABLE IF EXISTS test_dictionary_view;
CREATE VIEW test_dictionary_view
(
id UInt64,
value String
) AS SELECT id, value FROM test_dictionary_source_table WHERE id = (SELECT max(id) FROM test_dictionary_source_table);
INSERT INTO test_dictionary_source_table VALUES (1, '1'), (2, '2'), (3, '3');
DROP DICTIONARY IF EXISTS test_dictionary;
CREATE DICTIONARY test_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(TABLE 'test_dictionary_view'))
LIFETIME(MIN 0 MAX 1)
LAYOUT(FLAT());
SELECT * FROM test_dictionary;
INSERT INTO test_dictionary_source_table VALUES (4, '4');
SYSTEM RELOAD DICTIONARY test_dictionary;
SELECT * FROM test_dictionary;
DROP DICTIONARY test_dictionary;
DROP VIEW test_dictionary_view;
DROP TABLE test_dictionary_source_table;