diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp index 6257a60d678..c66009a5eca 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.cpp @@ -14,13 +14,18 @@ namespace DB { RabbitMQBlockInputStream::RabbitMQBlockInputStream( - StorageRabbitMQ & storage_, const Context & context_, const Names & columns, Poco::Logger * log_) + StorageRabbitMQ & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const Context & context_, + const Names & columns, + Poco::Logger * log_) : storage(storage_) + , metadata_snapshot(metadata_snapshot_) , context(context_) , column_names(columns) , log(log_) - , non_virtual_header(storage.getSampleBlockNonMaterialized()) - , virtual_header(storage.getSampleBlockForColumns({"_exchange"})) + , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) + , virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID())) { } @@ -36,7 +41,7 @@ RabbitMQBlockInputStream::~RabbitMQBlockInputStream() Block RabbitMQBlockInputStream::getHeader() const { - return storage.getSampleBlockForColumns(column_names); + return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()); } diff --git a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h index fbdb40bded8..d171893d3b3 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockInputStream.h @@ -14,6 +14,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream public: RabbitMQBlockInputStream( StorageRabbitMQ & storage_, + const StorageMetadataPtr & metadata_snapshot_, const Context & context_, const Names & columns, Poco::Logger * log_); @@ -29,6 +30,7 @@ public: private: StorageRabbitMQ & storage; + StorageMetadataPtr metadata_snapshot; Context context; Names column_names; Poco::Logger * log; diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp index 5dc2c1f8fc4..ddcde7cf24f 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.cpp @@ -15,14 +15,19 @@ namespace ErrorCodes RabbitMQBlockOutputStream::RabbitMQBlockOutputStream( - StorageRabbitMQ & storage_, const Context & context_) : storage(storage_), context(context_) + StorageRabbitMQ & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const Context & context_) + : storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , context(context_) { } Block RabbitMQBlockOutputStream::getHeader() const { - return storage.getSampleBlockNonMaterialized(); + return metadata_snapshot->getSampleBlockNonMaterialized(); } diff --git a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h index 2f7b89a2a30..f8ed79438f4 100644 --- a/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h +++ b/src/Storages/RabbitMQ/RabbitMQBlockOutputStream.h @@ -12,7 +12,7 @@ class RabbitMQBlockOutputStream : public IBlockOutputStream { public: - explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const Context & context_); + explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_); Block getHeader() const override; @@ -22,6 +22,7 @@ public: private: StorageRabbitMQ & storage; + StorageMetadataPtr metadata_snapshot; Context context; ProducerBufferPtr buffer; BlockOutputStreamPtr child; diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index 3de8d193302..60a641064a8 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -98,7 +98,9 @@ StorageRabbitMQ::StorageRabbitMQ( } rabbitmq_context.makeQueryContext(); - setColumns(columns_); + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(columns_); + setInMemoryMetadata(storage_metadata); task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); task->deactivate(); @@ -115,6 +117,7 @@ StorageRabbitMQ::StorageRabbitMQ( Pipes StorageRabbitMQ::read( const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, @@ -129,8 +132,9 @@ Pipes StorageRabbitMQ::read( for (size_t i = 0; i < num_created_consumers; ++i) { - pipes.emplace_back(std::make_shared(std::make_shared( - *this, context, column_names, log))); + pipes.emplace_back( + std::make_shared(std::make_shared( + *this, metadata_snapshot, context, column_names, log))); } LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); @@ -139,9 +143,9 @@ Pipes StorageRabbitMQ::read( } -BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const Context & context) +BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context) { - return std::make_shared(*this, context); + return std::make_shared(*this, metadata_snapshot, context); } @@ -316,7 +320,7 @@ bool StorageRabbitMQ::streamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { - auto stream = std::make_shared(*this, rabbitmq_context, block_io.out->getHeader().getNames(), log); + auto stream = std::make_shared(*this, getInMemoryMetadataPtr(), rabbitmq_context, block_io.out->getHeader().getNames(), log); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index 79e4d5e4ca2..567951dee6b 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -32,6 +32,7 @@ public: Pipes read( const Names & column_names, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage, @@ -40,6 +41,7 @@ public: BlockOutputStreamPtr write( const ASTPtr & query, + const StorageMetadataPtr & metadata_snapshot, const Context & context) override; void pushReadBuffer(ConsumerBufferPtr buf);