Move getSampleBlockNonMaterialized to StorageInMemoryMetadata

This commit is contained in:
alesapin 2020-06-16 15:48:10 +03:00
parent 0bcd22008a
commit 53cb5210de
14 changed files with 61 additions and 30 deletions

View File

@ -73,9 +73,12 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
return DatabaseCatalog::instance().getTable(query.table_id, context);
}
Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) const
Block InterpreterInsertQuery::getSampleBlock(
const ASTInsertQuery & query,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot) const
{
Block table_sample_non_materialized = table->getSampleBlockNonMaterialized();
Block table_sample_non_materialized = metadata_snapshot->getSampleBlockNonMaterialized();
/// If the query does not include information about columns
if (!query.columns)
{
@ -119,7 +122,7 @@ BlockIO InterpreterInsertQuery::execute()
true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto query_sample_block = getSampleBlock(query, table);
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);
if (!query.table_function)
context.checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());

View File

@ -4,6 +4,7 @@
#include <DataStreams/BlockIO.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
@ -34,7 +35,7 @@ public:
private:
StoragePtr getTable(ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table) const;
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table, const StorageMetadataPtr & metadata_snapshot) const;
ASTPtr query_ptr;
const Context & context;

View File

@ -438,8 +438,9 @@ void SystemLog<LogElement>::prepareTable()
if (table)
{
auto metadata_snapshot = table->getInMemoryMetadataPtr();
const Block expected = LogElement::createBlock();
const Block actual = table->getSampleBlockNonMaterialized();
const Block actual = metadata_snapshot->getSampleBlockNonMaterialized();
if (!blocksHaveEqualStructure(actual, expected))
{

View File

@ -74,15 +74,6 @@ Block IStorage::getSampleBlockWithVirtuals() const
return res;
}
Block IStorage::getSampleBlockNonMaterialized() const
{
Block res;
for (const auto & column : getColumns().getOrdinary())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
Block IStorage::getSampleBlockForColumns(const Names & column_names) const
{

View File

@ -160,7 +160,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
Block getSampleBlock() const; /// ordinary + materialized.
Block getSampleBlockWithVirtuals() const; /// ordinary + materialized + virtuals.
Block getSampleBlockNonMaterialized() const; /// ordinary.
Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals.
/// Verify that all the requested names are in the table and are set correctly:

View File

@ -13,14 +13,21 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix_)
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_,
const Names & columns,
size_t max_block_size_,
bool commit_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp","_timestamp_ms","_headers.name","_headers.value"}))
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns(
{"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}))
{
}

View File

@ -14,7 +14,12 @@ class KafkaBlockInputStream : public IBlockInputStream
{
public:
KafkaBlockInputStream(
StorageKafka & storage_, const std::shared_ptr<Context> & context_, const Names & columns, size_t max_block_size_, bool commit_in_suffix = true);
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_,
const Names & columns,
size_t max_block_size_,
bool commit_in_suffix = true);
~KafkaBlockInputStream() override;
String getName() const override { return storage.getName(); }
@ -29,6 +34,7 @@ public:
private:
StorageKafka & storage;
StorageMetadataPtr metadata_snapshot;
const std::shared_ptr<Context> context;
Names column_names;
UInt64 max_block_size;

View File

@ -11,13 +11,19 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_IO_BUFFER;
}
KafkaBlockOutputStream::KafkaBlockOutputStream(StorageKafka & storage_, const std::shared_ptr<Context> & context_) : storage(storage_), context(context_)
KafkaBlockOutputStream::KafkaBlockOutputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, context(context_)
{
}
Block KafkaBlockOutputStream::getHeader() const
{
return storage.getSampleBlockNonMaterialized();
return metadata_snapshot->getSampleBlockNonMaterialized();
}
void KafkaBlockOutputStream::writePrefix()

View File

@ -10,7 +10,10 @@ namespace DB
class KafkaBlockOutputStream : public IBlockOutputStream
{
public:
explicit KafkaBlockOutputStream(StorageKafka & storage_, const std::shared_ptr<Context> & context_);
explicit KafkaBlockOutputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::shared_ptr<Context> & context_);
Block getHeader() const override;
@ -22,6 +25,7 @@ public:
private:
StorageKafka & storage;
StorageMetadataPtr metadata_snapshot;
const std::shared_ptr<Context> context;
ProducerBufferPtr buffer;
BlockOutputStreamPtr child;

View File

@ -201,7 +201,7 @@ String StorageKafka::getDefaultClientId(const StorageID & table_id_)
Pipes StorageKafka::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /* query_info */,
const Context & context,
QueryProcessingStage::Enum /* processed_stage */,
@ -224,7 +224,7 @@ Pipes StorageKafka::read(
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
/// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place.
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, modified_context, column_names, 1)));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, modified_context, column_names, 1)));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -232,14 +232,14 @@ Pipes StorageKafka::read(
}
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
auto modified_context = std::make_shared<Context>(context);
modified_context->applySettingsChanges(settings_adjustments);
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaBlockOutputStream>(*this, modified_context);
return std::make_shared<KafkaBlockOutputStream>(*this, metadata_snapshot, modified_context);
}
@ -519,6 +519,7 @@ bool StorageKafka::streamToViews()
auto table = DatabaseCatalog::instance().getTable(table_id, global_context);
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = getInMemoryMetadataPtr();
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
@ -538,7 +539,7 @@ bool StorageKafka::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream
= std::make_shared<KafkaBlockInputStream>(*this, kafka_context, block_io.out->getHeader().getNames(), block_size, false);
= std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL

View File

@ -642,6 +642,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
LOG_ERROR(log, "Destination table {} doesn't exist. Block of data is discarded.", destination_id.getNameForLogs());
return;
}
auto destination_metadata_snapshot = table->getInMemoryMetadataPtr();
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
@ -651,7 +652,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases (but not all) when the table structure does not match.
*/
Block structure_of_destination_table = allow_materialized ? table->getSampleBlock() : table->getSampleBlockNonMaterialized();
Block structure_of_destination_table
= allow_materialized ? table->getSampleBlock() : destination_metadata_snapshot->getSampleBlockNonMaterialized();
Block block_to_write;
for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{

View File

@ -511,7 +511,7 @@ Pipes StorageDistributed::read(
}
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
auto cluster = getCluster();
const auto & settings = context.getSettingsRef();
@ -536,7 +536,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, getSampleBlockNonMaterialized()), cluster,
context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()), cluster,
insert_sync, timeout);
}

View File

@ -209,5 +209,13 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet
}
Block StorageInMemoryMetadata::getSampleBlockNonMaterialized() const
{
Block res;
for (const auto & column : getColumns().getOrdinary())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
}

View File

@ -106,6 +106,8 @@ struct StorageInMemoryMetadata
/// Returns columns, which will be needed to calculate dependencies (skip
/// indices, TTL expressions) if we update @updated_columns set of columns.
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const;
Block getSampleBlockNonMaterialized() const; /// ordinary.
};
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;