diff --git a/dbms/src/DataStreams/BlockIO.cpp b/dbms/src/DataStreams/BlockIO.cpp new file mode 100644 index 00000000000..60a0b415237 --- /dev/null +++ b/dbms/src/DataStreams/BlockIO.cpp @@ -0,0 +1,56 @@ +#include +#include + +namespace DB +{ + +void BlockIO::reset() +{ + /** process_list_entry should be destroyed after in, after out and after pipeline, + * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example), + * which could be used before destroying of in and out. + * + * However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason. + * Streams must be destroyed before storage locks, storages and contexts inside pipeline, + * so releaseQueryStreams() is required. + */ + /// TODO simplify it all + + out.reset(); + in.reset(); + if (process_list_entry) + process_list_entry->get().releaseQueryStreams(); + pipeline = QueryPipeline(); + process_list_entry.reset(); + + /// TODO Do we need also reset callbacks? In which order? +} + +BlockIO & BlockIO::operator= (BlockIO && rhs) +{ + if (this == &rhs) + return *this; + + /// Explicitly reset fields, so everything is destructed in right order + reset(); + + process_list_entry = std::move(rhs.process_list_entry); + in = std::move(rhs.in); + out = std::move(rhs.out); + pipeline = std::move(rhs.pipeline); + + finish_callback = std::move(rhs.finish_callback); + exception_callback = std::move(rhs.exception_callback); + + null_format = std::move(rhs.null_format); + + return *this; +} + +BlockIO::~BlockIO() +{ + reset(); +} + +} + diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 73e25543a4b..08a5f819fd6 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -15,13 +15,14 @@ class ProcessListEntry; struct BlockIO { BlockIO() = default; - BlockIO(const BlockIO &) = default; - ~BlockIO() = default; + BlockIO(BlockIO &&) = default; + + BlockIO & operator= (BlockIO && rhs); + ~BlockIO(); + + BlockIO(const BlockIO &) = delete; + BlockIO & operator= (const BlockIO & rhs) = delete; - /** process_list_entry should be destroyed after in, after out and after pipeline, - * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example), - * which could be used before destroying of in and out. - */ std::shared_ptr process_list_entry; BlockOutputStreamPtr out; @@ -49,28 +50,8 @@ struct BlockIO exception_callback(); } - BlockIO & operator= (const BlockIO & rhs) - { - if (this == &rhs) - return *this; - - out.reset(); - in.reset(); - pipeline = QueryPipeline(); - process_list_entry.reset(); - - process_list_entry = rhs.process_list_entry; - in = rhs.in; - out = rhs.out; - pipeline = rhs.pipeline; - - finish_callback = rhs.finish_callback; - exception_callback = rhs.exception_callback; - - null_format = rhs.null_format; - - return *this; - } +private: + void reset(); }; } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 7f9d872b023..8cd7edc85b8 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -555,7 +555,7 @@ static std::tuple executeQueryImpl( throw; } - return std::make_tuple(ast, res); + return std::make_tuple(ast, std::move(res)); } diff --git a/dbms/src/Processors/QueryPipeline.cpp b/dbms/src/Processors/QueryPipeline.cpp index a9388f46f87..6d49d73b7fd 100644 --- a/dbms/src/Processors/QueryPipeline.cpp +++ b/dbms/src/Processors/QueryPipeline.cpp @@ -675,4 +675,31 @@ PipelineExecutorPtr QueryPipeline::execute() return std::make_shared(processors, process_list_element); } +QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs) +{ + /// Reset primitive fields + process_list_element = rhs.process_list_element; + rhs.process_list_element = nullptr; + max_threads = rhs.max_threads; + rhs.max_threads = 0; + output_format = rhs.output_format; + rhs.output_format = nullptr; + has_resize = rhs.has_resize; + rhs.has_resize = false; + extremes_port = rhs.extremes_port; + rhs.extremes_port = nullptr; + totals_having_port = rhs.totals_having_port; + rhs.totals_having_port = nullptr; + + /// Move these fields in destruction order (it's important) + streams = std::move(rhs.streams); + processors = std::move(rhs.processors); + current_header = std::move(rhs.current_header); + table_locks = std::move(rhs.table_locks); + storage_holders = std::move(rhs.storage_holders); + interpreter_context = std::move(rhs.interpreter_context); + + return *this; +} + } diff --git a/dbms/src/Processors/QueryPipeline.h b/dbms/src/Processors/QueryPipeline.h index 78c4b35be4e..c5207188166 100644 --- a/dbms/src/Processors/QueryPipeline.h +++ b/dbms/src/Processors/QueryPipeline.h @@ -23,6 +23,12 @@ class QueryPipeline { public: QueryPipeline() = default; + QueryPipeline(QueryPipeline &&) = default; + ~QueryPipeline() = default; + QueryPipeline(const QueryPipeline &) = delete; + QueryPipeline & operator= (const QueryPipeline & rhs) = delete; + + QueryPipeline & operator= (QueryPipeline && rhs); /// All pipes must have same header. void init(Pipes pipes); @@ -97,6 +103,17 @@ public: Pipe getPipe() &&; private: + /// Destruction order: processors, header, locks, temporary storages, local contexts + + /// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter. + /// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here, + /// because QueryPipeline is alive until query is finished. + std::vector> interpreter_context; + std::vector storage_holders; + TableStructureReadLocks table_locks; + + /// Common header for each stream. + Block current_header; /// All added processors. Processors processors; @@ -111,17 +128,6 @@ private: /// If resize processor was added to pipeline. bool has_resize = false; - /// Common header for each stream. - Block current_header; - - TableStructureReadLocks table_locks; - - /// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter. - /// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here, - /// because QueryPipeline is alive until query is finished. - std::vector> interpreter_context; - std::vector storage_holders; - IOutputFormat * output_format = nullptr; size_t max_threads = 0; diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index e456f080df6..b9461075bfd 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -185,8 +185,8 @@ Pipes StorageMerge::read( /** Just in case, turn off optimization "transfer to PREWHERE", * since there is no certainty that it works when one of table is MergeTree and other is not. */ - Context modified_context = context; - modified_context.getSettingsRef().optimize_move_to_prewhere = false; + auto modified_context = std::make_shared(context); + modified_context->getSettingsRef().optimize_move_to_prewhere = false; /// What will be result structure depending on query processed stage in source tables? Block header = getQueryHeader(column_names, query_info, context, processed_stage); @@ -255,7 +255,7 @@ Pipes StorageMerge::read( Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock, Names & real_column_names, - Context & modified_context, size_t streams_num, bool has_table_virtual_column, + const std::shared_ptr & modified_context, size_t streams_num, bool has_table_virtual_column, bool concat_streams) { auto & [storage, struct_lock, table_name] = storage_with_lock; @@ -272,38 +272,38 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer { /// This flag means that pipeline must be tree-shaped, /// so we can't enable processors for InterpreterSelectQuery here. - auto stream = InterpreterSelectQuery(modified_query_info.query, modified_context, std::make_shared(header), + auto stream = InterpreterSelectQuery(modified_query_info.query, *modified_context, std::make_shared(header), SelectQueryOptions(processed_stage).analyze()).execute().in; pipes.emplace_back(std::make_shared(std::move(stream))); return pipes; } - pipes.emplace_back( - InterpreterSelectQuery(modified_query_info.query, modified_context, - std::make_shared(header), - SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe()); - + auto pipe = InterpreterSelectQuery(modified_query_info.query, *modified_context, + std::make_shared(header), + SelectQueryOptions(processed_stage).analyze()).executeWithProcessors().getPipe(); + pipe.addInterpreterContext(modified_context); + pipes.emplace_back(std::move(pipe)); return pipes; } - if (processed_stage <= storage->getQueryProcessingStage(modified_context)) + if (processed_stage <= storage->getQueryProcessingStage(*modified_context)) { /// If there are only virtual columns in query, you must request at least one other column. if (real_column_names.empty()) real_column_names.push_back(ExpressionActions::getSmallestColumn(storage->getColumns().getAllPhysical())); - pipes = storage->read(real_column_names, modified_query_info, modified_context, processed_stage, max_block_size, UInt32(streams_num)); + pipes = storage->read(real_column_names, modified_query_info, *modified_context, processed_stage, max_block_size, UInt32(streams_num)); } - else if (processed_stage > storage->getQueryProcessingStage(modified_context)) + else if (processed_stage > storage->getQueryProcessingStage(*modified_context)) { modified_query_info.query->as()->replaceDatabaseAndTable(source_database, table_name); /// Maximum permissible parallelism is streams_num - modified_context.getSettingsRef().max_threads = UInt64(streams_num); - modified_context.getSettingsRef().max_streams_to_max_threads_ratio = 1; + modified_context->getSettingsRef().max_threads = UInt64(streams_num); + modified_context->getSettingsRef().max_streams_to_max_threads_ratio = 1; - InterpreterSelectQuery interpreter{modified_query_info.query, modified_context, SelectQueryOptions(processed_stage)}; + InterpreterSelectQuery interpreter{modified_query_info.query, *modified_context, SelectQueryOptions(processed_stage)}; if (query_info.force_tree_shaped_pipeline) { @@ -343,9 +343,11 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. - convertingSourceStream(header, modified_context, modified_query_info.query, pipe, processed_stage); + convertingSourceStream(header, *modified_context, modified_query_info.query, pipe, processed_stage); pipe.addTableLock(struct_lock); + pipe.addInterpreterContext(modified_context); + } } diff --git a/dbms/src/Storages/StorageMerge.h b/dbms/src/Storages/StorageMerge.h index b0a331eae9e..6242a62f8e8 100644 --- a/dbms/src/Storages/StorageMerge.h +++ b/dbms/src/Storages/StorageMerge.h @@ -81,7 +81,7 @@ protected: const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage, const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock, Names & real_column_names, - Context & modified_context, size_t streams_num, bool has_table_virtual_column, + const std::shared_ptr & modified_context, size_t streams_num, bool has_table_virtual_column, bool concat_streams = false); void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query, diff --git a/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.reference b/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.reference index c96ca993fd9..5b376a0654f 100644 --- a/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.reference +++ b/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.reference @@ -3,10 +3,9 @@ CREATE TABLE test_01083.buffer (`n` Int8) ENGINE = Buffer(\'test_01083\', \'file CREATE TABLE test_01083.merge (`n` Int8) ENGINE = Merge(\'test_01083\', \'distributed\') CREATE TABLE test_01083.merge_tf AS merge(\'test_01083\', \'.*\') CREATE TABLE test_01083.distributed (`n` Int8) ENGINE = Distributed(\'test_shard_localhost\', \'test_01083\', \'file\') -CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'file\') -CREATE TABLE test_01083.url (`n` UInt64, `_path` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_path+from+test_01083.file+format+CSV\', \'CSV\') -CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'url\'))) -1 -1 -1 -1 +CREATE TABLE test_01083.distributed_tf AS cluster(\'test_shard_localhost\', \'test_01083\', \'buffer\') +CREATE TABLE test_01083.url (`n` UInt64, `col` String) ENGINE = URL(\'https://localhost:8443/?query=select+n,+_table+from+test_01083.merge+format+CSV\', \'CSV\') +CREATE TABLE test_01083.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'test_01083\', \'view\'))) +CREATE VIEW test_01083.view (`n` Int64) AS SELECT toInt64(n) AS n FROM (SELECT toString(n) AS n FROM test_01083.merge WHERE _table != \'qwerty\' ORDER BY _table ASC) UNION ALL SELECT * FROM test_01083.file +CREATE DICTIONARY test_01083.dict (`n` UInt64, `col` String DEFAULT \'42\') PRIMARY KEY n SOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9440 SECURE 1 USER \'default\' TABLE \'url\' DB \'test_01083\')) LIFETIME(MIN 0 MAX 1) LAYOUT(CACHE(SIZE_IN_CELLS 1)) +16 diff --git a/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.sql b/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.sql index 3c4ac17f790..8e5f5a148a5 100644 --- a/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.sql +++ b/dbms/tests/queries/0_stateless/01083_expressions_in_engine_arguments.sql @@ -7,24 +7,32 @@ CREATE TABLE buffer (n Int8) ENGINE = Buffer(currentDatabase(), file, 16, 10, 20 CREATE TABLE merge (n Int8) ENGINE = Merge('', lower('DISTRIBUTED')); CREATE TABLE merge_tf as merge(currentDatabase(), '.*'); CREATE TABLE distributed (n Int8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), 'fi' || 'le'); -CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'fi' || 'le'); +CREATE TABLE distributed_tf as cluster('test' || '_' || 'shard_localhost', '', 'buf' || 'fer'); -INSERT INTO file VALUES (1); -CREATE TABLE url (n UInt64, _path String) ENGINE=URL +INSERT INTO buffer VALUES (1); +DETACH TABLE buffer; -- trigger flushing +ATTACH TABLE buffer; + +CREATE TABLE url (n UInt64, col String) ENGINE=URL ( replace ( - 'https://localhost:8443/?query=' || 'select n, _path from ' || currentDatabase() || '.file format CSV', ' ', '+' -- replace `file` with `merge` here after #9246 is fixed + 'https://localhost:8443/?query=' || 'select n, _table from ' || currentDatabase() || '.merge format CSV', ' ', '+' ), CSV ); +CREATE VIEW view AS SELECT toInt64(n) as n FROM (SELECT toString(n) as n from merge WHERE _table != 'qwerty' ORDER BY _table) UNION ALL SELECT * FROM file; + -- The following line is needed just to disable checking stderr for emptiness SELECT nonexistentsomething; -- { serverError 47 } -CREATE DICTIONARY dict (n UInt64, _path String DEFAULT '42') PRIMARY KEY n +CREATE DICTIONARY dict (n UInt64, col String DEFAULT '42') PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9440 SECURE 1 USER 'default' TABLE 'url' DB 'test_01083')) LIFETIME(1) LAYOUT(CACHE(SIZE_IN_CELLS 1)); +-- dict --> url --> merge |-> distributed -> file (1) +-- |-> distributed_tf -> buffer -> file (1) + -- TODO make fuzz test from this CREATE TABLE rich_syntax as remote ( @@ -37,15 +45,16 @@ CREATE TABLE rich_syntax as remote '127.0.0.{1..4}', if ( - toString(40 + 2.0) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', '_path', toUInt64('0001'))), + toString(40 + 2) NOT IN ('hello', dictGetString(currentDatabase() || '.dict', 'col', toUInt64('0001'))), currentDatabase(), 'FAIL' ), - extract('123url456', '[a-z]+') + extract('123view456', '[a-z]+') ) ) ); + SHOW CREATE file; SHOW CREATE buffer; SHOW CREATE merge; @@ -54,7 +63,14 @@ SHOW CREATE distributed; SHOW CREATE distributed_tf; SHOW CREATE url; SHOW CREATE rich_syntax; +SHOW CREATE view; +SHOW CREATE dict; -SELECT n from rich_syntax; +INSERT INTO buffer VALUES (1); +-- remote(localhost) --> cluster(test_shard_localhost) |-> remote(127.0.0.1) --> view |-> subquery --> merge |-> distributed --> file (1) +-- | | |-> distributed_tf -> buffer (1) -> file (1) +-- | |-> file (1) +-- |-> remote(127.0.0.2) --> ... +SELECT sum(n) from rich_syntax; DROP DATABASE test_01083;