From 53cb5210debb5baa10d521d90bd6afb7988245e2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 16 Jun 2020 15:48:10 +0300 Subject: [PATCH] Move getSampleBlockNonMaterialized to StorageInMemoryMetadata --- src/Interpreters/InterpreterInsertQuery.cpp | 9 ++++++--- src/Interpreters/InterpreterInsertQuery.h | 3 ++- src/Interpreters/SystemLog.h | 3 ++- src/Storages/IStorage.cpp | 9 --------- src/Storages/IStorage.h | 1 - src/Storages/Kafka/KafkaBlockInputStream.cpp | 13 ++++++++++--- src/Storages/Kafka/KafkaBlockInputStream.h | 8 +++++++- src/Storages/Kafka/KafkaBlockOutputStream.cpp | 10 ++++++++-- src/Storages/Kafka/KafkaBlockOutputStream.h | 6 +++++- src/Storages/Kafka/StorageKafka.cpp | 11 ++++++----- src/Storages/StorageBuffer.cpp | 4 +++- src/Storages/StorageDistributed.cpp | 4 ++-- src/Storages/StorageInMemoryMetadata.cpp | 8 ++++++++ src/Storages/StorageInMemoryMetadata.h | 2 ++ 14 files changed, 61 insertions(+), 30 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 1841c82b710..d281dc5ccca 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -73,9 +73,12 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query) return DatabaseCatalog::instance().getTable(query.table_id, context); } -Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) const +Block InterpreterInsertQuery::getSampleBlock( + const ASTInsertQuery & query, + const StoragePtr & table, + const StorageMetadataPtr & metadata_snapshot) const { - Block table_sample_non_materialized = table->getSampleBlockNonMaterialized(); + Block table_sample_non_materialized = metadata_snapshot->getSampleBlockNonMaterialized(); /// If the query does not include information about columns if (!query.columns) { @@ -119,7 +122,7 @@ BlockIO InterpreterInsertQuery::execute() true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout); auto metadata_snapshot = table->getInMemoryMetadataPtr(); - auto query_sample_block = getSampleBlock(query, table); + auto query_sample_block = getSampleBlock(query, table, metadata_snapshot); if (!query.table_function) context.checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames()); diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index fef962d24a3..3386b471d26 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -34,7 +35,7 @@ public: private: StoragePtr getTable(ASTInsertQuery & query); - Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) const; + Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const; ASTPtr query_ptr; const Context & context; diff --git a/src/Interpreters/SystemLog.h b/src/Interpreters/SystemLog.h index e49ce574478..cf163226b93 100644 --- a/src/Interpreters/SystemLog.h +++ b/src/Interpreters/SystemLog.h @@ -438,8 +438,9 @@ void SystemLog::prepareTable() if (table) { + auto metadata_snapshot = table->getInMemoryMetadataPtr(); const Block expected = LogElement::createBlock(); - const Block actual = table->getSampleBlockNonMaterialized(); + const Block actual = metadata_snapshot->getSampleBlockNonMaterialized(); if (!blocksHaveEqualStructure(actual, expected)) { diff --git a/src/Storages/IStorage.cpp b/src/Storages/IStorage.cpp index 6dae96a3322..e675d51b4b7 100644 --- a/src/Storages/IStorage.cpp +++ b/src/Storages/IStorage.cpp @@ -74,15 +74,6 @@ Block IStorage::getSampleBlockWithVirtuals() const return res; } -Block IStorage::getSampleBlockNonMaterialized() const -{ - Block res; - - for (const auto & column : getColumns().getOrdinary()) - res.insert({column.type->createColumn(), column.type, column.name}); - - return res; -} Block IStorage::getSampleBlockForColumns(const Names & column_names) const { diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index d3e65b6a845..42581ebb63b 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -160,7 +160,6 @@ public: /// thread-unsafe part. lockStructure must be acquired Block getSampleBlock() const; /// ordinary + materialized. Block getSampleBlockWithVirtuals() const; /// ordinary + materialized + virtuals. - Block getSampleBlockNonMaterialized() const; /// ordinary. Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals. /// Verify that all the requested names are in the table and are set correctly: diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 3edfcc7b9d2..dd2bb68c11a 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -13,14 +13,21 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } KafkaBlockInputStream::KafkaBlockInputStream( - StorageKafka & storage_, const std::shared_ptr & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_) + StorageKafka & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const std::shared_ptr & context_, + const Names & columns, + size_t max_block_size_, + bool commit_in_suffix_) : storage(storage_) + , metadata_snapshot(metadata_snapshot_) , context(context_) , column_names(columns) , max_block_size(max_block_size_) , commit_in_suffix(commit_in_suffix_) - , non_virtual_header(storage.getSampleBlockNonMaterialized()) - , virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp","_timestamp_ms","_headers.name","_headers.value"})) + , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) + , virtual_header(storage.getSampleBlockForColumns( + {"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"})) { } diff --git a/src/Storages/Kafka/KafkaBlockInputStream.h b/src/Storages/Kafka/KafkaBlockInputStream.h index 387f5088721..4851050a56e 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/src/Storages/Kafka/KafkaBlockInputStream.h @@ -14,7 +14,12 @@ class KafkaBlockInputStream : public IBlockInputStream { public: KafkaBlockInputStream( - StorageKafka & storage_, const std::shared_ptr & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix = true); + StorageKafka & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const std::shared_ptr & context_, + const Names & columns, + size_t max_block_size_, + bool commit_in_suffix = true); ~KafkaBlockInputStream() override; String getName() const override { return storage.getName(); } @@ -29,6 +34,7 @@ public: private: StorageKafka & storage; + StorageMetadataPtr metadata_snapshot; const std::shared_ptr context; Names column_names; UInt64 max_block_size; diff --git a/src/Storages/Kafka/KafkaBlockOutputStream.cpp b/src/Storages/Kafka/KafkaBlockOutputStream.cpp index 17ef5aa104c..60ac714bd52 100644 --- a/src/Storages/Kafka/KafkaBlockOutputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockOutputStream.cpp @@ -11,13 +11,19 @@ namespace ErrorCodes extern const int CANNOT_CREATE_IO_BUFFER; } -KafkaBlockOutputStream::KafkaBlockOutputStream(StorageKafka & storage_, const std::shared_ptr & context_) : storage(storage_), context(context_) +KafkaBlockOutputStream::KafkaBlockOutputStream( + StorageKafka & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const std::shared_ptr & context_) + : storage(storage_) + , metadata_snapshot(metadata_snapshot_) + , context(context_) { } Block KafkaBlockOutputStream::getHeader() const { - return storage.getSampleBlockNonMaterialized(); + return metadata_snapshot->getSampleBlockNonMaterialized(); } void KafkaBlockOutputStream::writePrefix() diff --git a/src/Storages/Kafka/KafkaBlockOutputStream.h b/src/Storages/Kafka/KafkaBlockOutputStream.h index 7a973724f1b..1121d2a119e 100644 --- a/src/Storages/Kafka/KafkaBlockOutputStream.h +++ b/src/Storages/Kafka/KafkaBlockOutputStream.h @@ -10,7 +10,10 @@ namespace DB class KafkaBlockOutputStream : public IBlockOutputStream { public: - explicit KafkaBlockOutputStream(StorageKafka & storage_, const std::shared_ptr & context_); + explicit KafkaBlockOutputStream( + StorageKafka & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const std::shared_ptr & context_); Block getHeader() const override; @@ -22,6 +25,7 @@ public: private: StorageKafka & storage; + StorageMetadataPtr metadata_snapshot; const std::shared_ptr context; ProducerBufferPtr buffer; BlockOutputStreamPtr child; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 190397bc675..b46cf0579ec 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -201,7 +201,7 @@ String StorageKafka::getDefaultClientId(const StorageID & table_id_) Pipes StorageKafka::read( const Names & column_names, - const StorageMetadataPtr & /*metadata_snapshot*/, + const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & /* query_info */, const Context & context, QueryProcessingStage::Enum /* processed_stage */, @@ -224,7 +224,7 @@ Pipes StorageKafka::read( /// TODO: probably that leads to awful performance. /// FIXME: seems that doesn't help with extra reading and committing unprocessed messages. /// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place. - pipes.emplace_back(std::make_shared(std::make_shared(*this, modified_context, column_names, 1))); + pipes.emplace_back(std::make_shared(std::make_shared(*this, metadata_snapshot, modified_context, column_names, 1))); } LOG_DEBUG(log, "Starting reading {} streams", pipes.size()); @@ -232,14 +232,14 @@ Pipes StorageKafka::read( } -BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) +BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context) { auto modified_context = std::make_shared(context); modified_context->applySettingsChanges(settings_adjustments); if (topics.size() > 1) throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED); - return std::make_shared(*this, modified_context); + return std::make_shared(*this, metadata_snapshot, modified_context); } @@ -519,6 +519,7 @@ bool StorageKafka::streamToViews() auto table = DatabaseCatalog::instance().getTable(table_id, global_context); if (!table) throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); + auto metadata_snapshot = getInMemoryMetadataPtr(); // Create an INSERT query for streaming data auto insert = std::make_shared(); @@ -538,7 +539,7 @@ bool StorageKafka::streamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { auto stream - = std::make_shared(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false); + = std::make_shared(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), block_size, false); streams.emplace_back(stream); // Limit read batch to maximum block size to allow DDL diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 3e419921115..4754732159c 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -642,6 +642,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl LOG_ERROR(log, "Destination table {} doesn't exist. Block of data is discarded.", destination_id.getNameForLogs()); return; } + auto destination_metadata_snapshot = table->getInMemoryMetadataPtr(); auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); @@ -651,7 +652,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl /** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table. * This will support some of the cases (but not all) when the table structure does not match. */ - Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized(); + Block structure_of_destination_table + = allow_materialized ? table->getSampleBlock() : destination_metadata_snapshot->getSampleBlockNonMaterialized(); Block block_to_write; for (size_t i : ext::range(0, structure_of_destination_table.columns())) { diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 719811bbc6b..66066ec3c18 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -511,7 +511,7 @@ Pipes StorageDistributed::read( } -BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) +BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context) { auto cluster = getCluster(); const auto & settings = context.getSettingsRef(); @@ -536,7 +536,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta /// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster return std::make_shared( - context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlockNonMaterialized()), cluster, + context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()), cluster, insert_sync, timeout); } diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index bf747fb9b5a..2c5b6279e10 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -209,5 +209,13 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet } +Block StorageInMemoryMetadata::getSampleBlockNonMaterialized() const +{ + Block res; + for (const auto & column : getColumns().getOrdinary()) + res.insert({column.type->createColumn(), column.type, column.name}); + + return res; +} } diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index fb7bcbaa349..d6c00bb35c8 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -106,6 +106,8 @@ struct StorageInMemoryMetadata /// Returns columns, which will be needed to calculate dependencies (skip /// indices, TTL expressions) if we update @updated_columns set of columns. ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const; + + Block getSampleBlockNonMaterialized() const; /// ordinary. }; using StorageMetadataPtr = std::shared_ptr;