Ensure resource desrtuction order in ReadFromStorageStep.

This commit is contained in:
Nikolai Kochetov 2020-10-06 09:35:35 +03:00
parent ce3d18e8c5
commit 28e12c559c
4 changed files with 19 additions and 40 deletions

View File

@ -101,7 +101,7 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota); void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
/// Do not allow to change the table while the processors of pipe are alive. /// Do not allow to change the table while the processors of pipe are alive.
void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); } void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible. /// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); } void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); } void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }

View File

@ -101,7 +101,7 @@ public:
const Block & getHeader() const { return pipe.getHeader(); } const Block & getHeader() const { return pipe.getHeader(); }
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); } void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); } void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); } void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); } void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }

View File

@ -13,33 +13,26 @@ namespace DB
ReadFromStorageStep::ReadFromStorageStep( ReadFromStorageStep::ReadFromStorageStep(
TableLockHolder table_lock_, TableLockHolder table_lock_,
StorageMetadataPtr metadata_snapshot_, StorageMetadataPtr metadata_snapshot,
StreamLocalLimits & limits_, StreamLocalLimits & limits,
SizeLimits & leaf_limits_, SizeLimits & leaf_limits,
std::shared_ptr<const EnabledQuota> quota_, std::shared_ptr<const EnabledQuota> quota,
StoragePtr storage_, StoragePtr storage_,
const Names & required_columns_, const Names & required_columns,
const SelectQueryInfo & query_info_, const SelectQueryInfo & query_info,
std::shared_ptr<Context> context_, std::shared_ptr<Context> context_,
QueryProcessingStage::Enum processing_stage_, QueryProcessingStage::Enum processing_stage,
size_t max_block_size_, size_t max_block_size,
size_t max_streams_) size_t max_streams)
: table_lock(std::move(table_lock_))
, metadata_snapshot(std::move(metadata_snapshot_))
, limits(limits_)
, leaf_limits(leaf_limits_)
, quota(std::move(quota_))
, storage(std::move(storage_))
, required_columns(required_columns_)
, query_info(query_info_)
, context(std::move(context_))
, processing_stage(processing_stage_)
, max_block_size(max_block_size_)
, max_streams(max_streams_)
{ {
/// Note: we read from storage in constructor of step because we don't know real header before reading. /// Note: we read from storage in constructor of step because we don't know real header before reading.
/// It will be fixed when storage return QueryPlanStep itself. /// It will be fixed when storage return QueryPlanStep itself.
/// Move arguments into stack in order to ensure order of destruction in case of exception.
auto context = std::move(context_);
auto storage = std::move(storage_);
auto table_lock = std::move(table_lock_);
Pipe pipe = storage->read(required_columns, metadata_snapshot, query_info, *context, processing_stage, max_block_size, max_streams); Pipe pipe = storage->read(required_columns, metadata_snapshot, query_info, *context, processing_stage, max_block_size, max_streams);
if (pipe.empty()) if (pipe.empty())
@ -83,9 +76,6 @@ ReadFromStorageStep::ReadFromStorageStep(
pipeline = std::make_unique<QueryPipeline>(); pipeline = std::make_unique<QueryPipeline>();
QueryPipelineProcessorsCollector collector(*pipeline, this); QueryPipelineProcessorsCollector collector(*pipeline, this);
/// Table lock is stored inside pipeline here.
pipeline->addTableLock(table_lock);
pipe.setLimits(limits); pipe.setLimits(limits);
/** /**
@ -103,8 +93,11 @@ ReadFromStorageStep::ReadFromStorageStep(
pipeline->init(std::move(pipe)); pipeline->init(std::move(pipe));
/// Add resources to pipeline. The order is important.
/// Add in reverse order of destruction. Pipeline will be destroyed at the end in case of exception.
pipeline->addInterpreterContext(std::move(context)); pipeline->addInterpreterContext(std::move(context));
pipeline->addStorageHolder(std::move(storage)); pipeline->addStorageHolder(std::move(storage));
pipeline->addTableLock(std::move(table_lock));
processors = collector.detachProcessors(); processors = collector.detachProcessors();

View File

@ -45,20 +45,6 @@ public:
void describePipeline(FormatSettings & settings) const override; void describePipeline(FormatSettings & settings) const override;
private: private:
TableLockHolder table_lock;
StorageMetadataPtr metadata_snapshot;
StreamLocalLimits limits;
SizeLimits leaf_limits;
std::shared_ptr<const EnabledQuota> quota;
StoragePtr storage;
const Names & required_columns;
const SelectQueryInfo & query_info;
std::shared_ptr<Context> context;
QueryProcessingStage::Enum processing_stage;
size_t max_block_size;
size_t max_streams;
QueryPipelinePtr pipeline; QueryPipelinePtr pipeline;
Processors processors; Processors processors;
}; };