Correct merge with master

This commit is contained in:
alesapin 2020-06-24 20:32:57 +03:00
parent 3fc65b3269
commit cb30dbfe28
6 changed files with 32 additions and 13 deletions

View File

@ -14,13 +14,18 @@ namespace DB
{ {
RabbitMQBlockInputStream::RabbitMQBlockInputStream( RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_, const Context & context_, const Names & columns, Poco::Logger * log_) StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
const Names & columns,
Poco::Logger * log_)
: storage(storage_) : storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_) , context(context_)
, column_names(columns) , column_names(columns)
, log(log_) , log(log_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns({"_exchange"})) , virtual_header(metadata_snapshot->getSampleBlockForColumns({"_exchange"}, storage.getVirtuals(), storage.getStorageID()))
{ {
} }
@ -36,7 +41,7 @@ RabbitMQBlockInputStream::~RabbitMQBlockInputStream()
Block RabbitMQBlockInputStream::getHeader() const Block RabbitMQBlockInputStream::getHeader() const
{ {
return storage.getSampleBlockForColumns(column_names); return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
} }

View File

@ -14,6 +14,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream
public: public:
RabbitMQBlockInputStream( RabbitMQBlockInputStream(
StorageRabbitMQ & storage_, StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_, const Context & context_,
const Names & columns, const Names & columns,
Poco::Logger * log_); Poco::Logger * log_);
@ -29,6 +30,7 @@ public:
private: private:
StorageRabbitMQ & storage; StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context; Context context;
Names column_names; Names column_names;
Poco::Logger * log; Poco::Logger * log;

View File

@ -15,14 +15,19 @@ namespace ErrorCodes
RabbitMQBlockOutputStream::RabbitMQBlockOutputStream( RabbitMQBlockOutputStream::RabbitMQBlockOutputStream(
StorageRabbitMQ & storage_, const Context & context_) : storage(storage_), context(context_) StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{ {
} }
Block RabbitMQBlockOutputStream::getHeader() const Block RabbitMQBlockOutputStream::getHeader() const
{ {
return storage.getSampleBlockNonMaterialized(); return metadata_snapshot->getSampleBlockNonMaterialized();
} }

View File

@ -12,7 +12,7 @@ class RabbitMQBlockOutputStream : public IBlockOutputStream
{ {
public: public:
explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const Context & context_); explicit RabbitMQBlockOutputStream(StorageRabbitMQ & storage_, const StorageMetadataPtr & metadata_snapshot_, const Context & context_);
Block getHeader() const override; Block getHeader() const override;
@ -22,6 +22,7 @@ public:
private: private:
StorageRabbitMQ & storage; StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
Context context; Context context;
ProducerBufferPtr buffer; ProducerBufferPtr buffer;
BlockOutputStreamPtr child; BlockOutputStreamPtr child;

View File

@ -98,7 +98,9 @@ StorageRabbitMQ::StorageRabbitMQ(
} }
rabbitmq_context.makeQueryContext(); rabbitmq_context.makeQueryContext();
setColumns(columns_); StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
setInMemoryMetadata(storage_metadata);
task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); }); task = global_context.getSchedulePool().createTask(log->name(), [this]{ threadFunc(); });
task->deactivate(); task->deactivate();
@ -115,6 +117,7 @@ StorageRabbitMQ::StorageRabbitMQ(
Pipes StorageRabbitMQ::read( Pipes StorageRabbitMQ::read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /* query_info */, const SelectQueryInfo & /* query_info */,
const Context & context, const Context & context,
QueryProcessingStage::Enum /* processed_stage */, QueryProcessingStage::Enum /* processed_stage */,
@ -129,8 +132,9 @@ Pipes StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<RabbitMQBlockInputStream>( pipes.emplace_back(
*this, context, column_names, log))); std::make_shared<SourceFromInputStream>(std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, context, column_names, log)));
} }
LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -139,9 +143,9 @@ Pipes StorageRabbitMQ::read(
} }
BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const Context & context) BlockOutputStreamPtr StorageRabbitMQ::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{ {
return std::make_shared<RabbitMQBlockOutputStream>(*this, context); return std::make_shared<RabbitMQBlockOutputStream>(*this, metadata_snapshot, context);
} }
@ -316,7 +320,7 @@ bool StorageRabbitMQ::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i) for (size_t i = 0; i < num_created_consumers; ++i)
{ {
auto stream = std::make_shared<RabbitMQBlockInputStream>(*this, rabbitmq_context, block_io.out->getHeader().getNames(), log); auto stream = std::make_shared<RabbitMQBlockInputStream>(*this, getInMemoryMetadataPtr(), rabbitmq_context, block_io.out->getHeader().getNames(), log);
streams.emplace_back(stream); streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL // Limit read batch to maximum block size to allow DDL

View File

@ -32,6 +32,7 @@ public:
Pipes read( Pipes read(
const Names & column_names, const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info, const SelectQueryInfo & query_info,
const Context & context, const Context & context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
@ -40,6 +41,7 @@ public:
BlockOutputStreamPtr write( BlockOutputStreamPtr write(
const ASTPtr & query, const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) override; const Context & context) override;
void pushReadBuffer(ConsumerBufferPtr buf); void pushReadBuffer(ConsumerBufferPtr buf);