diff --git a/dbms/src/DataStreams/BlockIO.cpp b/dbms/src/DataStreams/BlockIO.cpp new file mode 100644 index 00000000000..60a0b415237 --- /dev/null +++ b/dbms/src/DataStreams/BlockIO.cpp @@ -0,0 +1,56 @@ +#include +#include + +namespace DB +{ + +void BlockIO::reset() +{ + /** process_list_entry should be destroyed after in, after out and after pipeline, + * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example), + * which could be used before destroying of in and out. + * + * However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason. + * Streams must be destroyed before storage locks, storages and contexts inside pipeline, + * so releaseQueryStreams() is required. + */ + /// TODO simplify it all + + out.reset(); + in.reset(); + if (process_list_entry) + process_list_entry->get().releaseQueryStreams(); + pipeline = QueryPipeline(); + process_list_entry.reset(); + + /// TODO Do we need also reset callbacks? In which order? +} + +BlockIO & BlockIO::operator= (BlockIO && rhs) +{ + if (this == &rhs) + return *this; + + /// Explicitly reset fields, so everything is destructed in right order + reset(); + + process_list_entry = std::move(rhs.process_list_entry); + in = std::move(rhs.in); + out = std::move(rhs.out); + pipeline = std::move(rhs.pipeline); + + finish_callback = std::move(rhs.finish_callback); + exception_callback = std::move(rhs.exception_callback); + + null_format = std::move(rhs.null_format); + + return *this; +} + +BlockIO::~BlockIO() +{ + reset(); +} + +} + diff --git a/dbms/src/DataStreams/BlockIO.h b/dbms/src/DataStreams/BlockIO.h index 73e25543a4b..08a5f819fd6 100644 --- a/dbms/src/DataStreams/BlockIO.h +++ b/dbms/src/DataStreams/BlockIO.h @@ -15,13 +15,14 @@ class ProcessListEntry; struct BlockIO { BlockIO() = default; - BlockIO(const BlockIO &) = default; - ~BlockIO() = default; + BlockIO(BlockIO &&) = default; + + BlockIO & operator= (BlockIO && rhs); + ~BlockIO(); + + BlockIO(const BlockIO &) = delete; + BlockIO & operator= (const BlockIO & rhs) = delete; - /** process_list_entry should be destroyed after in, after out and after pipeline, - * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example), - * which could be used before destroying of in and out. - */ std::shared_ptr process_list_entry; BlockOutputStreamPtr out; @@ -49,28 +50,8 @@ struct BlockIO exception_callback(); } - BlockIO & operator= (const BlockIO & rhs) - { - if (this == &rhs) - return *this; - - out.reset(); - in.reset(); - pipeline = QueryPipeline(); - process_list_entry.reset(); - - process_list_entry = rhs.process_list_entry; - in = rhs.in; - out = rhs.out; - pipeline = rhs.pipeline; - - finish_callback = rhs.finish_callback; - exception_callback = rhs.exception_callback; - - null_format = rhs.null_format; - - return *this; - } +private: + void reset(); }; } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index ac67ded4b45..0d3f814d3d9 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -557,7 +557,7 @@ static std::tuple executeQueryImpl( throw; } - return std::make_tuple(ast, res); + return std::make_tuple(ast, std::move(res)); } diff --git a/dbms/src/Processors/QueryPipeline.cpp b/dbms/src/Processors/QueryPipeline.cpp index 9f9fc51b0ca..0d74015aad8 100644 --- a/dbms/src/Processors/QueryPipeline.cpp +++ b/dbms/src/Processors/QueryPipeline.cpp @@ -671,4 +671,31 @@ PipelineExecutorPtr QueryPipeline::execute() return std::make_shared(processors, process_list_element); } +QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs) +{ + /// Reset primitive fields + process_list_element = rhs.process_list_element; + rhs.process_list_element = nullptr; + max_threads = rhs.max_threads; + rhs.max_threads = 0; + output_format = rhs.output_format; + rhs.output_format = nullptr; + has_resize = rhs.has_resize; + rhs.has_resize = false; + extremes_port = rhs.extremes_port; + rhs.extremes_port = nullptr; + totals_having_port = rhs.totals_having_port; + rhs.totals_having_port = nullptr; + + /// Move these fields in destruction order (it's important) + streams = std::move(rhs.streams); + processors = std::move(rhs.processors); + current_header = std::move(rhs.current_header); + table_locks = std::move(rhs.table_locks); + storage_holders = std::move(rhs.storage_holders); + interpreter_context = std::move(rhs.interpreter_context); + + return *this; +} + } diff --git a/dbms/src/Processors/QueryPipeline.h b/dbms/src/Processors/QueryPipeline.h index 78c4b35be4e..c5207188166 100644 --- a/dbms/src/Processors/QueryPipeline.h +++ b/dbms/src/Processors/QueryPipeline.h @@ -23,6 +23,12 @@ class QueryPipeline { public: QueryPipeline() = default; + QueryPipeline(QueryPipeline &&) = default; + ~QueryPipeline() = default; + QueryPipeline(const QueryPipeline &) = delete; + QueryPipeline & operator= (const QueryPipeline & rhs) = delete; + + QueryPipeline & operator= (QueryPipeline && rhs); /// All pipes must have same header. void init(Pipes pipes); @@ -97,6 +103,17 @@ public: Pipe getPipe() &&; private: + /// Destruction order: processors, header, locks, temporary storages, local contexts + + /// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter. + /// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here, + /// because QueryPipeline is alive until query is finished. + std::vector> interpreter_context; + std::vector storage_holders; + TableStructureReadLocks table_locks; + + /// Common header for each stream. + Block current_header; /// All added processors. Processors processors; @@ -111,17 +128,6 @@ private: /// If resize processor was added to pipeline. bool has_resize = false; - /// Common header for each stream. - Block current_header; - - TableStructureReadLocks table_locks; - - /// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter. - /// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here, - /// because QueryPipeline is alive until query is finished. - std::vector> interpreter_context; - std::vector storage_holders; - IOutputFormat * output_format = nullptr; size_t max_threads = 0;