fix BlockIO destruction order

This commit is contained in:
Alexander Tokmakov 2020-02-27 18:40:11 +03:00
parent df8a90319a
commit 8a91081331
5 changed files with 110 additions and 40 deletions

View File

@ -0,0 +1,56 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/ProcessList.h>
namespace DB
{
void BlockIO::reset()
{
/** process_list_entry should be destroyed after in, after out and after pipeline,
* since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
* which could be used before destroying of in and out.
*
* However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason.
* Streams must be destroyed before storage locks, storages and contexts inside pipeline,
* so releaseQueryStreams() is required.
*/
/// TODO simplify it all
out.reset();
in.reset();
if (process_list_entry)
process_list_entry->get().releaseQueryStreams();
pipeline = QueryPipeline();
process_list_entry.reset();
/// TODO Do we need also reset callbacks? In which order?
}
BlockIO & BlockIO::operator= (BlockIO && rhs)
{
if (this == &rhs)
return *this;
/// Explicitly reset fields, so everything is destructed in right order
reset();
process_list_entry = std::move(rhs.process_list_entry);
in = std::move(rhs.in);
out = std::move(rhs.out);
pipeline = std::move(rhs.pipeline);
finish_callback = std::move(rhs.finish_callback);
exception_callback = std::move(rhs.exception_callback);
null_format = std::move(rhs.null_format);
return *this;
}
BlockIO::~BlockIO()
{
reset();
}
}

View File

@ -15,13 +15,14 @@ class ProcessListEntry;
struct BlockIO
{
BlockIO() = default;
BlockIO(const BlockIO &) = default;
~BlockIO() = default;
BlockIO(BlockIO &&) = default;
BlockIO & operator= (BlockIO && rhs);
~BlockIO();
BlockIO(const BlockIO &) = delete;
BlockIO & operator= (const BlockIO & rhs) = delete;
/** process_list_entry should be destroyed after in, after out and after pipeline,
* since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
* which could be used before destroying of in and out.
*/
std::shared_ptr<ProcessListEntry> process_list_entry;
BlockOutputStreamPtr out;
@ -49,28 +50,8 @@ struct BlockIO
exception_callback();
}
BlockIO & operator= (const BlockIO & rhs)
{
if (this == &rhs)
return *this;
out.reset();
in.reset();
pipeline = QueryPipeline();
process_list_entry.reset();
process_list_entry = rhs.process_list_entry;
in = rhs.in;
out = rhs.out;
pipeline = rhs.pipeline;
finish_callback = rhs.finish_callback;
exception_callback = rhs.exception_callback;
null_format = rhs.null_format;
return *this;
}
private:
void reset();
};
}

View File

@ -557,7 +557,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
throw;
}
return std::make_tuple(ast, res);
return std::make_tuple(ast, std::move(res));
}

View File

@ -671,4 +671,31 @@ PipelineExecutorPtr QueryPipeline::execute()
return std::make_shared<PipelineExecutor>(processors, process_list_element);
}
QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs)
{
/// Reset primitive fields
process_list_element = rhs.process_list_element;
rhs.process_list_element = nullptr;
max_threads = rhs.max_threads;
rhs.max_threads = 0;
output_format = rhs.output_format;
rhs.output_format = nullptr;
has_resize = rhs.has_resize;
rhs.has_resize = false;
extremes_port = rhs.extremes_port;
rhs.extremes_port = nullptr;
totals_having_port = rhs.totals_having_port;
rhs.totals_having_port = nullptr;
/// Move these fields in destruction order (it's important)
streams = std::move(rhs.streams);
processors = std::move(rhs.processors);
current_header = std::move(rhs.current_header);
table_locks = std::move(rhs.table_locks);
storage_holders = std::move(rhs.storage_holders);
interpreter_context = std::move(rhs.interpreter_context);
return *this;
}
}

View File

@ -23,6 +23,12 @@ class QueryPipeline
{
public:
QueryPipeline() = default;
QueryPipeline(QueryPipeline &&) = default;
~QueryPipeline() = default;
QueryPipeline(const QueryPipeline &) = delete;
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
QueryPipeline & operator= (QueryPipeline && rhs);
/// All pipes must have same header.
void init(Pipes pipes);
@ -97,6 +103,17 @@ public:
Pipe getPipe() &&;
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
TableStructureReadLocks table_locks;
/// Common header for each stream.
Block current_header;
/// All added processors.
Processors processors;
@ -111,17 +128,6 @@ private:
/// If resize processor was added to pipeline.
bool has_resize = false;
/// Common header for each stream.
Block current_header;
TableStructureReadLocks table_locks;
/// Some Streams (or Processors) may implicitly use Context or temporary Storage created by Interpreter.
/// But lifetime of Streams is not nested in lifetime of Interpreters, so we have to store it here,
/// because QueryPipeline is alive until query is finished.
std::vector<std::shared_ptr<Context>> interpreter_context;
std::vector<StoragePtr> storage_holders;
IOutputFormat * output_format = nullptr;
size_t max_threads = 0;