fix another Context-related segfault

This commit is contained in:
Alexander Tokmakov 2020-02-26 17:13:41 +03:00
parent 355678d99d
commit df8a90319a
4 changed files with 49 additions and 32 deletions

View File

@ -184,8 +184,8 @@ Pipes StorageMerge::read(
/** Just in case, turn off optimization "transfer to PREWHERE",
* since there is no certainty that it works when one of table is MergeTree and other is not.
*/
Context modified_context = context;
modified_context.getSettingsRef().optimize_move_to_prewhere = false;
auto modified_context = std::make_shared<Context>(context);
modified_context->getSettingsRef().optimize_move_to_prewhere = false;
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
@ -254,7 +254,7 @@ Pipes StorageMerge::read(
Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams)
{
auto & [storage, struct_lock, table_name] = storage_with_lock;
@ -271,38 +271,38 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
{
/// This flag means that pipeline must be tree-shaped,
/// so we can't enable processors for InterpreterSelectQuery here.
auto stream = InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared<OneBlockInputStream>(header),
auto stream = InterpreterSelectQuery(modified_query_info.query, *modified_context, std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).execute().in;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
return pipes;
}
pipes.emplace_back(
InterpreterSelectQuery(modified_query_info.query, modified_context,
auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context,
std::make_shared<OneBlockInputStream>(header),
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe());
SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe();
pipe.addInterpreterContext(modified_context);
pipes.emplace_back(std::move(pipe));
return pipes;
}
if (processed_stage <= storage->getQueryProcessingStage(modified_context))
if (processed_stage <= storage->getQueryProcessingStage(*modified_context))
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical()));
pipes = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, UInt32(streams_num));
pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num));
}
else if (processed_stage > storage->getQueryProcessingStage(modified_context))
else if (processed_stage > storage->getQueryProcessingStage(*modified_context))
{
modified_query_info.query->as<ASTSelectQuery>()->replaceDatabaseAndTable(source_database, table_name);
/// Maximum permissible parallelism is streams_num
modified_context.getSettingsRef().max_threads = UInt64(streams_num);
modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1;
modified_context->getSettingsRef().max_threads = UInt64(streams_num);
modified_context->getSettingsRef().max_streams_to_max_threads_ratio = 1;
InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)};
InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)};
if (query_info.force_tree_shaped_pipeline)
{
@ -342,9 +342,11 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, modified_context, modified_query_info.query, pipe, processed_stage);
convertingSourceStream(header, *modified_context, modified_query_info.query, pipe, processed_stage);
pipe.addTableLock(struct_lock);
pipe.addInterpreterContext(modified_context);
}
}

View File

@ -81,7 +81,7 @@ protected:
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
Context & modified_context, size_t streams_num, bool has_table_virtual_column,
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
bool concat_streams = false);
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,

View File

@ -3,10 +3,9 @@ CREATE TABLE test_01083.buffer (`n` Int8) ENGINE = Buffer(\'test_01083\', \'file
CREATE TABLE test_01083.merge (`n` Int8) ENGINE = Merge(\'test_01083\', \'distributed\')
CREATE TABLE test_01083.merge_tf AS merge(\'test_01083\', \'.*\')
CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(\'test_shard_localhost\', \'test_01083\', \'file\')
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'file\')
CREATE TABLE test_01083.url (`n` UInt64, `_path` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_path+from+test_01083.file+format+CSV\', \'CSV\')
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'url\')))
1
1
1
1
CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'buffer\')
CREATE TABLE test_01083.url (`n` UInt64, `col` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_table+from+test_01083.merge+format+CSV\', \'CSV\')
CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'view\')))
CREATE VIEW test_01083.view (`n` Int64) AS SELECT toInt64(n) AS n FROM (SELECT toString(n) AS n FROM test_01083.merge WHERE _table != \'qwerty\' ORDER BY _table ASC) UNION ALL SELECT * FROM test_01083.file
CREATE DICTIONARY test_01083.dict (`n` UInt64, `col` String DEFAULT \'42\') PRIMARY KEY n SOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9440 SECURE 1 USER \'default\' TABLE \'url\' DB \'test_01083\')) LIFETIME(MIN 0 MAX 1) LAYOUT(CACHE(SIZE_IN_CELLS 1))
16

View File

@ -7,24 +7,32 @@ CREATE TABLE buffer (n Int8) ENGINE = Buffer(currentDatabase(), file, 16, 10, 20
CREATE TABLE merge (n Int8) ENGINE = Merge('', lower('DISTRIBUTED'));
CREATE TABLE merge_tf as merge(currentDatabase(), '.*');
CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'fi' || 'le');
CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'fi' || 'le');
CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'buf' || 'fer');
INSERT INTO file VALUES (1);
CREATE TABLE url (n UInt64, _path String) ENGINE=URL
INSERT INTO buffer VALUES (1);
DETACH TABLE buffer; -- trigger flushing
ATTACH TABLE buffer;
CREATE TABLE url (n UInt64, col String) ENGINE=URL
(
replace
(
'https://localhost:8443/?query=' || 'select n, _path from ' || currentDatabase() || '.file format CSV', ' ', '+' -- replace `file` with `merge` here after #9246 is fixed
'https://localhost:8443/?query=' || 'select n, _table from ' || currentDatabase() || '.merge format CSV', ' ', '+'
),
CSV
);
CREATE VIEW view AS SELECT toInt64(n) as n FROM (SELECT toString(n) as n from merge WHERE _table != 'qwerty' ORDER BY _table) UNION ALL SELECT * FROM file;
-- The following line is needed just to disable checking stderr for emptiness
SELECT nonexistentsomething; -- { serverError 47 }
CREATE DICTIONARY dict (n UInt64, _path String DEFAULT '42') PRIMARY KEY n
CREATE DICTIONARY dict (n UInt64, col String DEFAULT '42') PRIMARY KEY n
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9440 SECURE 1 USER 'default' TABLE 'url' DB 'test_01083')) LIFETIME(1) LAYOUT(CACHE(SIZE_IN_CELLS 1));
-- dict --> url --> merge |-> distributed -> file (1)
-- |-> distributed_tf -> buffer -> file (1)
-- TODO make fuzz test from this
CREATE TABLE rich_syntax as remote
(
@ -37,15 +45,16 @@ CREATE TABLE rich_syntax as remote
'127.0.0.{1..4}',
if
(
toString(40 + 2.0) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', '_path', toUInt64('0001'))),
toString(40 + 2) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', 'col', toUInt64('0001'))),
currentDatabase(),
'FAIL'
),
extract('123url456', '[a-z]+')
extract('123view456', '[a-z]+')
)
)
);
SHOW CREATE file;
SHOW CREATE buffer;
SHOW CREATE merge;
@ -54,7 +63,14 @@ SHOW CREATE distributed;
SHOW CREATE distributed_tf;
SHOW CREATE url;
SHOW CREATE rich_syntax;
SHOW CREATE view;
SHOW CREATE dict;
SELECT n from rich_syntax;
INSERT INTO buffer VALUES (1);
-- remote(localhost) --> cluster(test_shard_localhost) |-> remote(127.0.0.1) --> view |-> subquery --> merge |-> distributed --> file (1)
-- | | |-> distributed_tf -> buffer (1) -> file (1)
-- | |-> file (1)
-- |-> remote(127.0.0.2) --> ...
SELECT sum(n) from rich_syntax;
DROP DATABASE test_01083;