From 28e12c559c9f2f3f990d50cec4889fd0f132fb08 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 6 Oct 2020 09:35:35 +0300 Subject: [PATCH] Ensure resource desrtuction order in ReadFromStorageStep. --- src/Processors/Pipe.h | 2 +- src/Processors/QueryPipeline.h | 2 +- .../QueryPlan/ReadFromStorageStep.cpp | 41 ++++++++----------- .../QueryPlan/ReadFromStorageStep.h | 14 ------- 4 files changed, 19 insertions(+), 40 deletions(-) diff --git a/src/Processors/Pipe.h b/src/Processors/Pipe.h index f674663154d..f5feca7a2db 100644 --- a/src/Processors/Pipe.h +++ b/src/Processors/Pipe.h @@ -101,7 +101,7 @@ public: void setQuota(const std::shared_ptr & quota); /// Do not allow to change the table while the processors of pipe are alive. - void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); } + void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); } /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. void addInterpreterContext(std::shared_ptr context) { holder.interpreter_context.emplace_back(std::move(context)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 80ae1d591a4..6321928b357 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -101,7 +101,7 @@ public: const Block & getHeader() const { return pipe.getHeader(); } - void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); } + void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); } void addInterpreterContext(std::shared_ptr context) { pipe.addInterpreterContext(std::move(context)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addQueryPlan(std::unique_ptr plan) { pipe.addQueryPlan(std::move(plan)); } diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.cpp b/src/Processors/QueryPlan/ReadFromStorageStep.cpp index b085c177ad4..474410148fa 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromStorageStep.cpp @@ -13,33 +13,26 @@ namespace DB ReadFromStorageStep::ReadFromStorageStep( TableLockHolder table_lock_, - StorageMetadataPtr metadata_snapshot_, - StreamLocalLimits & limits_, - SizeLimits & leaf_limits_, - std::shared_ptr quota_, + StorageMetadataPtr metadata_snapshot, + StreamLocalLimits & limits, + SizeLimits & leaf_limits, + std::shared_ptr quota, StoragePtr storage_, - const Names & required_columns_, - const SelectQueryInfo & query_info_, + const Names & required_columns, + const SelectQueryInfo & query_info, std::shared_ptr context_, - QueryProcessingStage::Enum processing_stage_, - size_t max_block_size_, - size_t max_streams_) - : table_lock(std::move(table_lock_)) - , metadata_snapshot(std::move(metadata_snapshot_)) - , limits(limits_) - , leaf_limits(leaf_limits_) - , quota(std::move(quota_)) - , storage(std::move(storage_)) - , required_columns(required_columns_) - , query_info(query_info_) - , context(std::move(context_)) - , processing_stage(processing_stage_) - , max_block_size(max_block_size_) - , max_streams(max_streams_) + QueryProcessingStage::Enum processing_stage, + size_t max_block_size, + size_t max_streams) { /// Note: we read from storage in constructor of step because we don't know real header before reading. /// It will be fixed when storage return QueryPlanStep itself. + /// Move arguments into stack in order to ensure order of destruction in case of exception. + auto context = std::move(context_); + auto storage = std::move(storage_); + auto table_lock = std::move(table_lock_); + Pipe pipe = storage->read(required_columns, metadata_snapshot, query_info, *context, processing_stage, max_block_size, max_streams); if (pipe.empty()) @@ -83,9 +76,6 @@ ReadFromStorageStep::ReadFromStorageStep( pipeline = std::make_unique(); QueryPipelineProcessorsCollector collector(*pipeline, this); - /// Table lock is stored inside pipeline here. - pipeline->addTableLock(table_lock); - pipe.setLimits(limits); /** @@ -103,8 +93,11 @@ ReadFromStorageStep::ReadFromStorageStep( pipeline->init(std::move(pipe)); + /// Add resources to pipeline. The order is important. + /// Add in reverse order of destruction. Pipeline will be destroyed at the end in case of exception. pipeline->addInterpreterContext(std::move(context)); pipeline->addStorageHolder(std::move(storage)); + pipeline->addTableLock(std::move(table_lock)); processors = collector.detachProcessors(); diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.h b/src/Processors/QueryPlan/ReadFromStorageStep.h index 98cde63a863..93a948116e8 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromStorageStep.h @@ -45,20 +45,6 @@ public: void describePipeline(FormatSettings & settings) const override; private: - TableLockHolder table_lock; - StorageMetadataPtr metadata_snapshot; - StreamLocalLimits limits; - SizeLimits leaf_limits; - std::shared_ptr quota; - - StoragePtr storage; - const Names & required_columns; - const SelectQueryInfo & query_info; - std::shared_ptr context; - QueryProcessingStage::Enum processing_stage; - size_t max_block_size; - size_t max_streams; - QueryPipelinePtr pipeline; Processors processors; };