Buildable getSampleBlock in StorageInMemoryMetadata

This commit is contained in:
alesapin 2020-06-16 18:51:29 +03:00
parent 824d6667d9
commit 1ddeb3d149
54 changed files with 328 additions and 188 deletions

View File

@ -333,7 +333,8 @@ void RemoteQueryExecutor::sendExternalTables()
data->table_name = table.first;
if (pipes.empty())
data->pipe = std::make_unique<Pipe>(std::make_shared<SourceFromSingleChunk>(cur->getSampleBlock(), Chunk()));
data->pipe = std::make_unique<Pipe>(
std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), Chunk()));
else if (pipes.size() == 1)
data->pipe = std::make_unique<Pipe>(std::move(pipes.front()));
else

View File

@ -43,6 +43,7 @@ BlockIO InterpreterAlterQuery::execute()
context.checkAccess(getRequiredAccess());
auto table_id = context.resolveStorageID(alter, Context::ResolveOrdinary);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, context);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Add default database to table identifiers that we can encounter in e.g. default expressions,
/// mutation expression, etc.
@ -91,7 +92,7 @@ BlockIO InterpreterAlterQuery::execute()
if (!partition_commands.empty())
{
table->alterPartition(query_ptr, partition_commands, context);
table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
}
if (!live_view_commands.empty())

View File

@ -88,7 +88,7 @@ Block InterpreterInsertQuery::getSampleBlock(
return table_sample_non_materialized;
}
Block table_sample = table->getSampleBlock();
Block table_sample = metadata_snapshot->getSampleBlock();
/// Form the block based on the column names from the query
Block res;
for (const auto & identifier : query.columns->children)

View File

@ -326,8 +326,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
{
StoragePtr storage = context.executeTableFunction(input_function);
auto & input_storage = dynamic_cast<StorageInput &>(*storage);
BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(ast, istr,
input_storage.getSampleBlock(), context, input_function);
auto input_metadata_snapshot = input_storage.getInMemoryMetadataPtr();
BlockInputStreamPtr input_stream = std::make_shared<InputStreamFromASTInsertQuery>(
ast, istr, input_metadata_snapshot->getSampleBlock(), context, input_function);
input_storage.setInputStream(input_stream);
}
}

View File

@ -223,7 +223,7 @@ void TCPHandler::runImpl()
}
/// Send block to the client - input storage structure.
state.input_header = input_storage->getSampleBlock();
state.input_header = input_storage->getInMemoryMetadataPtr()->getSampleBlock();
sendData(state.input_header);
});

View File

@ -83,18 +83,29 @@ static void writeBlockConvert(const BlockOutputStreamPtr & out, const Block & bl
DistributedBlockOutputStream::DistributedBlockOutputStream(
const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_,
bool insert_sync_, UInt64 insert_timeout_)
: context(context_), storage(storage_), query_ast(query_ast_), query_string(queryToString(query_ast_)),
cluster(cluster_), insert_sync(insert_sync_),
insert_timeout(insert_timeout_), log(&Poco::Logger::get("DistributedBlockOutputStream"))
const Context & context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_)
: context(context_)
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, query_ast(query_ast_)
, query_string(queryToString(query_ast_))
, cluster(cluster_)
, insert_sync(insert_sync_)
, insert_timeout(insert_timeout_)
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
}
Block DistributedBlockOutputStream::getHeader() const
{
return storage.getSampleBlock();
return metadata_snapshot->getSampleBlock();
}
@ -109,7 +120,7 @@ void DistributedBlockOutputStream::write(const Block & block)
/* They are added by the AddingDefaultBlockOutputStream, and we will get
* different number of columns eventually */
for (const auto & col : storage.getColumns().getMaterialized())
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
{
if (ordinary_block.has(col.name))
{

View File

@ -2,6 +2,7 @@
#include <Parsers/formatAST.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Core/Block.h>
#include <Common/PODArray.h>
#include <Common/Throttler.h>
@ -36,8 +37,14 @@ class StorageDistributed;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(const Context & context_, StorageDistributed & storage_, const ASTPtr & query_ast_,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
DistributedBlockOutputStream(
const Context & context_,
StorageDistributed & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const ASTPtr & query_ast_,
const ClusterPtr & cluster_,
bool insert_sync_,
UInt64 insert_timeout_);
Block getHeader() const override;
void write(const Block & block) override;
@ -79,6 +86,7 @@ private:
private:
const Context & context;
StorageDistributed & storage;
StorageMetadataPtr metadata_snapshot;
ASTPtr query_ast;
String query_string;
ClusterPtr cluster;

View File

@ -52,16 +52,6 @@ const ConstraintsDescription & IStorage::getConstraints() const
return metadata->constraints;
}
Block IStorage::getSampleBlock() const
{
Block res;
for (const auto & column : getColumns().getAllPhysical())
res.insert({column.type->createColumn(), column.type, column.name});
return res;
}
namespace
{

View File

@ -158,8 +158,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata; }
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata = std::make_shared<StorageInMemoryMetadata>(metadata_); }
Block getSampleBlock() const; /// ordinary + materialized.
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.
void check(const Names & column_names, bool include_virtuals = false) const;
@ -361,7 +359,7 @@ public:
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.
*/
virtual void alterPartition(const ASTPtr & /* query */, const PartitionCommands & /* commands */, const Context & /* context */)
virtual void alterPartition(const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, const PartitionCommands & /* commands */, const Context & /* context */)
{
throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

View File

@ -5,10 +5,11 @@
namespace DB
{
IMergedBlockOutputStream::IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part)
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_)
: storage(data_part->storage)
, metadata_snapshot(metadata_snapshot_)
, volume(data_part->volume)
, part_path(data_part->getFullRelativePath())
{

View File

@ -13,7 +13,8 @@ class IMergedBlockOutputStream : public IBlockOutputStream
{
public:
IMergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part);
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_);
using WrittenOffsetColumns = std::set<std::string>;
@ -36,6 +37,7 @@ protected:
protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
VolumePtr volume;
String part_path;

View File

@ -8,7 +8,7 @@ namespace DB
Block MergeTreeBlockOutputStream::getHeader() const
{
return storage.getSampleBlock();
return metadata_snapshot->getSampleBlock();
}
@ -21,7 +21,7 @@ void MergeTreeBlockOutputStream::write(const Block & block)
{
Stopwatch watch;
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
@ -13,14 +14,22 @@ class StorageMergeTree;
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
MergeTreeBlockOutputStream(StorageMergeTree & storage_, size_t max_parts_per_block_)
: storage(storage_), max_parts_per_block(max_parts_per_block_) {}
MergeTreeBlockOutputStream(
StorageMergeTree & storage_,
const StorageMetadataPtr metadata_snapshot_,
size_t max_parts_per_block_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_parts_per_block(max_parts_per_block_)
{
}
Block getHeader() const override;
void write(const Block & block) override;
private:
StorageMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_parts_per_block;
};

View File

@ -808,6 +808,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream to{
new_data_part,
metadata_snapshot,
merging_columns,
index_factory.getMany(data.getSecondaryIndices()),
compression_codec,
@ -912,6 +913,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
MergedColumnOnlyOutputStream column_to(
new_data_part,
metadata_snapshot,
column_gathered_stream.getHeader(),
compression_codec,
/// we don't need to recalc indices here
@ -1085,6 +1087,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto part_indices = getIndicesForNewDataPart(data.getSecondaryIndices(), for_file_renames);
mutateAllPartColumns(
new_data_part,
metadata_snapshot,
part_indices,
in,
time_of_mutation,
@ -1137,6 +1140,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
mutateSomePartColumns(
source_part,
metadata_snapshot,
indices_to_recalc,
updated_header,
new_data_part,
@ -1582,6 +1586,7 @@ bool MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadataPtr & met
void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
@ -1603,6 +1608,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
MergedBlockOutputStream out{
new_data_part,
metadata_snapshot,
new_data_part->getColumns(),
skip_indices,
compression_codec};
@ -1629,6 +1635,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns(
void MergeTreeDataMergerMutator::mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,
@ -1647,6 +1654,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns(
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
new_data_part,
metadata_snapshot,
mutation_header,
compression_codec,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),

View File

@ -182,6 +182,7 @@ private:
/// Override all columns of new part using mutating_stream
void mutateAllPartColumns(
MergeTreeData::MutableDataPartPtr new_data_part,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeIndices & skip_indices,
BlockInputStreamPtr mutating_stream,
time_t time_of_mutation,
@ -192,6 +193,7 @@ private:
/// Mutate some columns of source part with mutation_stream
void mutateSomePartColumns(
const MergeTreeDataPartPtr & source_part,
const StorageMetadataPtr & metadata_snapshot,
const std::set<MergeTreeIndexPtr> & indices_to_recalc,
const Block & mutation_header,
MergeTreeData::MutableDataPartPtr new_data_part,

View File

@ -192,7 +192,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block
return result;
}
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition)
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
{
Block & block = block_with_partition.block;
@ -302,7 +302,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
auto compression_codec = data.global_context.chooseCompressionCodec(0, 0);
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, columns, index_factory.getMany(data.getSecondaryIndices()), compression_codec);
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(data.getSecondaryIndices()), compression_codec);
out.writePrefix();
out.writeWithPermutation(block, perm_ptr);

View File

@ -45,7 +45,7 @@ public:
/** All rows must correspond to same partition.
* Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData.
*/
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block);
MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot);
private:
MergeTreeData & data;

View File

@ -197,7 +197,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = data.getSampleBlock();
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto i : ext::range(0, parts.size()))
{

View File

@ -115,7 +115,7 @@ try
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,

View File

@ -72,7 +72,7 @@ try
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, data_part->storage.getSampleBlock());
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();

View File

@ -15,12 +15,18 @@ namespace ErrorCodes
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
bool blocks_are_granules_size)
: MergedBlockOutputStream(
data_part, columns_list_, skip_indices, default_codec, {},
data_part,
metadata_snapshot_,
columns_list_,
skip_indices,
default_codec,
{},
data_part->storage.global_context.getSettings().min_bytes_to_use_direct_io,
blocks_are_granules_size)
{
@ -28,13 +34,14 @@ MergedBlockOutputStream::MergedBlockOutputStream(
MergedBlockOutputStream::MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size,
size_t aio_threshold,
bool blocks_are_granules_size)
: IMergedBlockOutputStream(data_part)
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
, columns_list(columns_list_)
{
MergeTreeWriterSettings writer_settings(data_part->storage.global_context.getSettings(),

View File

@ -15,6 +15,7 @@ class MergedBlockOutputStream final : public IMergedBlockOutputStream
public:
MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
@ -22,6 +23,7 @@ public:
MergedBlockOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec,
@ -29,7 +31,7 @@ public:
size_t aio_threshold,
bool blocks_are_granules_size = false);
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
/// If the data is pre-sorted.
void write(const Block & block) override;

View File

@ -10,13 +10,15 @@ namespace ErrorCodes
MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const Block & header_,
CompressionCodecPtr default_codec,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
WrittenOffsetColumns * offset_columns_,
const MergeTreeIndexGranularity & index_granularity,
const MergeTreeIndexGranularityInfo * index_granularity_info)
: IMergedBlockOutputStream(data_part), header(header_)
: IMergedBlockOutputStream(data_part, metadata_snapshot_)
, header(header_)
{
const auto & global_settings = data_part->storage.global_context.getSettings();
MergeTreeWriterSettings writer_settings(

View File

@ -15,6 +15,7 @@ public:
/// if you want to serialize elements of Nested data structure in different instances of MergedColumnOnlyOutputStream.
MergedColumnOnlyOutputStream(
const MergeTreeDataPartPtr & data_part,
const StorageMetadataPtr & metadata_snapshot_,
const Block & header_,
CompressionCodecPtr default_codec_,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,

View File

@ -31,9 +31,19 @@ namespace ErrorCodes
ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_, size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool deduplicate_)
: storage(storage_), quorum(quorum_), quorum_timeout_ms(quorum_timeout_ms_), max_parts_per_block(max_parts_per_block_), deduplicate(deduplicate_),
log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool deduplicate_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, quorum(quorum_)
, quorum_timeout_ms(quorum_timeout_ms_)
, max_parts_per_block(max_parts_per_block_)
, deduplicate(deduplicate_)
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
{
/// The quorum value `1` has the same meaning as if it is disabled.
if (quorum == 1)
@ -43,7 +53,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream(
Block ReplicatedMergeTreeBlockOutputStream::getHeader() const
{
return storage.getSampleBlock();
return metadata_snapshot->getSampleBlock();
}
@ -128,7 +138,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
/// Write part to the filesystem under temporary name. Calculate a checksum.
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block);
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
String block_id;

View File

@ -22,8 +22,12 @@ class StorageReplicatedMergeTree;
class ReplicatedMergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
ReplicatedMergeTreeBlockOutputStream(StorageReplicatedMergeTree & storage_,
size_t quorum_, size_t quorum_timeout_ms_, size_t max_parts_per_block_,
ReplicatedMergeTreeBlockOutputStream(
StorageReplicatedMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot_,
size_t quorum_,
size_t quorum_timeout_ms_,
size_t max_parts_per_block_,
bool deduplicate_);
Block getHeader() const override;
@ -55,6 +59,7 @@ private:
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot;
size_t quorum;
size_t quorum_timeout_ms;
size_t max_parts_per_block;

View File

@ -167,10 +167,10 @@ Pipes StorageBuffer::read(
auto destination_lock = destination->lockStructureForShare(
false, context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [this, destination](const String& column_name)
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination](const String& column_name)
{
const auto & dest_columns = destination->getColumns();
const auto & our_columns = getColumns();
const auto & our_columns = metadata_snapshot->getColumns();
return dest_columns.hasPhysical(column_name) &&
dest_columns.get(column_name).type->equals(*our_columns.get(column_name).type);
});
@ -188,7 +188,7 @@ Pipes StorageBuffer::read(
else
{
/// There is a struct mismatch and we need to convert read blocks from the destination table.
const Block header = getSampleBlock();
const Block header = metadata_snapshot->getSampleBlock();
Names columns_intersection = column_names;
Block header_after_adding_defaults = header;
const auto & dest_columns = destination->getColumns();
@ -326,9 +326,14 @@ static void appendBlock(const Block & from, Block & to)
class BufferBlockOutputStream : public IBlockOutputStream
{
public:
explicit BufferBlockOutputStream(StorageBuffer & storage_) : storage(storage_) {}
explicit BufferBlockOutputStream(
StorageBuffer & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
@ -404,6 +409,7 @@ public:
}
private:
StorageBuffer & storage;
StorageMetadataPtr metadata_snapshot;
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
{
@ -434,9 +440,9 @@ private:
};
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageBuffer::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<BufferBlockOutputStream>(*this);
return std::make_shared<BufferBlockOutputStream>(*this, metadata_snapshot);
}
@ -654,8 +660,8 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases (but not all) when the table structure does not match.
*/
Block structure_of_destination_table
= allow_materialized ? table->getSampleBlock() : destination_metadata_snapshot->getSampleBlockNonMaterialized();
Block structure_of_destination_table = allow_materialized ? destination_metadata_snapshot->getSampleBlock()
: destination_metadata_snapshot->getSampleBlockNonMaterialized();
Block block_to_write;
for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{

View File

@ -536,7 +536,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
context, *this, createInsertToRemoteTableQuery(remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()), cluster,
context, *this, metadata_snapshot, createInsertToRemoteTableQuery(remote_database, remote_table, metadata_snapshot->getSampleBlockNonMaterialized()), cluster,
insert_sync, timeout);
}

View File

@ -214,9 +214,9 @@ public:
using FilesInfoPtr = std::shared_ptr<FilesInfo>;
static Block getHeader(StorageFile & storage, bool need_path_column, bool need_file_column)
static Block getHeader(const StorageMetadataPtr & metadata_snapshot, bool need_path_column, bool need_file_column)
{
auto header = storage.getSampleBlock();
auto header = metadata_snapshot->getSampleBlock();
/// Note: AddingDefaultsBlockInputStream doesn't change header.
@ -230,12 +230,14 @@ public:
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Context & context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnDefaults column_defaults_)
: SourceWithProgress(getHeader(*storage_, files_info_->need_path_column, files_info_->need_file_column))
: SourceWithProgress(getHeader(metadata_snapshot_, files_info_->need_path_column, files_info_->need_file_column))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, files_info(std::move(files_info_))
, column_defaults(std::move(column_defaults_))
, context(context_)
@ -310,7 +312,7 @@ public:
read_buf = wrapReadBufferWithCompressionMethod(std::move(nested_buffer), method);
reader = FormatFactory::instance().getInput(
storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
storage->format_name, *read_buf, metadata_snapshot->getSampleBlock(), context, max_block_size);
if (!column_defaults.empty())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, column_defaults, context);
@ -357,6 +359,7 @@ public:
private:
std::shared_ptr<StorageFile> storage;
StorageMetadataPtr metadata_snapshot;
FilesInfoPtr files_info;
String current_path;
Block sample_block;
@ -377,7 +380,7 @@ private:
Pipes StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -414,7 +417,7 @@ Pipes StorageFile::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, context, max_block_size, files_info, getColumns().getDefaults()));
this_ptr, metadata_snapshot, context, max_block_size, files_info, getColumns().getDefaults()));
return pipes;
}
@ -423,10 +426,14 @@ Pipes StorageFile::read(
class StorageFileBlockOutputStream : public IBlockOutputStream
{
public:
explicit StorageFileBlockOutputStream(StorageFile & storage_,
explicit StorageFileBlockOutputStream(
StorageFile & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const CompressionMethod compression_method,
const Context & context)
: storage(storage_), lock(storage.rwlock)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
{
if (storage.use_table_fd)
{
@ -446,10 +453,10 @@ public:
compression_method, 3);
}
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), context);
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, metadata_snapshot->getSampleBlock(), context);
}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
@ -473,6 +480,7 @@ public:
private:
StorageFile & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
@ -480,13 +488,13 @@ private:
BlockOutputStreamPtr StorageFile::write(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const Context & context)
{
if (format_name == "Distributed")
throw Exception("Method write is not implemented for Distributed format", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<StorageFileBlockOutputStream>(*this,
return std::make_shared<StorageFileBlockOutputStream>(*this, metadata_snapshot,
chooseCompressionMethod(paths[0], compression_method), context);
}

View File

@ -264,7 +264,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
Pipes StorageHDFS::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
@ -296,16 +296,16 @@ Pipes StorageHDFS::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<HDFSSource>(
sources_info, uri_without_path, format_name, compression_method, getSampleBlock(), context_, max_block_size));
sources_info, uri_without_path, format_name, compression_method, metadata_snapshot->getSampleBlock(), context_, max_block_size));
return pipes;
}
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<HDFSBlockOutputStream>(uri,
format_name,
getSampleBlock(),
metadata_snapshot->getSampleBlock(),
context,
chooseCompressionMethod(uri, compression_method));
}

View File

@ -60,7 +60,7 @@ void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
Pipes StorageInput::read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -74,7 +74,7 @@ Pipes StorageInput::read(
{
/// Send structure to the client.
query_context.initializeInput(shared_from_this());
pipes.emplace_back(std::make_shared<StorageInputSource>(query_context, getSampleBlock()));
pipes.emplace_back(std::make_shared<StorageInputSource>(query_context, metadata_snapshot->getSampleBlock()));
return pipes;
}

View File

@ -53,29 +53,33 @@ StorageJoin::StorageJoin(
, strictness(strictness_)
, overwrite(overwrite_)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
for (const auto & key : key_names)
if (!getColumns().hasPhysical(key))
if (!metadata_snapshot->getColumns().hasPhysical(key))
throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
join = std::make_shared<HashJoin>(table_join, getSampleBlock().sortColumns(), overwrite);
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
restore();
}
void StorageJoin::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
/// TODO(alesap) FIXME
auto metadata_snapshot = getInMemoryMetadataPtr();
Poco::File(path).remove(true);
Poco::File(path).createDirectories();
Poco::File(path + "tmp/").createDirectories();
increment = 0;
join = std::make_shared<HashJoin>(table_join, getSampleBlock().sortColumns(), overwrite);
join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
}
HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
throw Exception("Table " + getStorageID().getNameForLogs() + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
@ -89,7 +93,7 @@ HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
/// Some HACK to remove wrong names qualifiers: table.column -> column.
analyzed_join->setRightKeys(key_names);
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, getSampleBlock().sortColumns());
HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
join_clone->reuseJoinedData(*join);
return join_clone;
}

View File

@ -114,10 +114,12 @@ private:
class LogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit LogBlockOutputStream(StorageLog & storage_)
: storage(storage_),
lock(storage.rwlock),
marks_stream(storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
explicit LogBlockOutputStream(StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
, marks_stream(
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{
}
@ -133,12 +135,13 @@ public:
}
}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
bool done = false;
@ -621,10 +624,10 @@ Pipes StorageLog::read(
return pipes;
}
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
loadMarks();
return std::make_shared<LogBlockOutputStream>(*this);
return std::make_shared<LogBlockOutputStream>(*this, metadata_snapshot);
}
CheckResults StorageLog::checkData(const ASTPtr & /* query */, const Context & /* context */)

View File

@ -244,10 +244,11 @@ void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & command
}
}
void StorageMaterializedView::alterPartition(const ASTPtr & query, const PartitionCommands &commands, const Context &context)
void StorageMaterializedView::alterPartition(
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context)
{
checkStatementCanBeForwarded();
getTargetTable()->alterPartition(query, commands, context);
getTargetTable()->alterPartition(query, metadata_snapshot, commands, context);
}
void StorageMaterializedView::mutate(const MutationCommands & commands, const Context & context)

View File

@ -43,7 +43,7 @@ public:
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override;
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override;
void alterPartition(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;

View File

@ -168,7 +168,9 @@ Pipes StorageMerge::read(
if (selected_tables.empty())
/// FIXME: do we support sampling in this case?
return createSources(
query_info, processed_stage, max_block_size, header, {}, real_column_names, modified_context, 0, has_table_virtual_column);
metadata_snapshot, query_info, processed_stage,
max_block_size, header, {}, real_column_names,
modified_context, 0, has_table_virtual_column);
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables)));
@ -207,7 +209,8 @@ Pipes StorageMerge::read(
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
auto source_pipes = createSources(
query_info, processed_stage, max_block_size, header, table, real_column_names, modified_context,
metadata_snapshot, query_info, processed_stage,
max_block_size, header, table, real_column_names, modified_context,
current_streams, has_table_virtual_column);
for (auto & pipe : source_pipes)
@ -220,10 +223,17 @@ Pipes StorageMerge::read(
return narrowPipes(std::move(res), num_streams);
}
Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
Pipes StorageMerge::createSources(
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
const Block & header,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
const std::shared_ptr<Context> & modified_context,
size_t streams_num,
bool has_table_virtual_column,
bool concat_streams)
{
const auto & [storage, struct_lock, table_name] = storage_with_lock;
@ -244,7 +254,6 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
return pipes;
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_stage = storage->getQueryProcessingStage(*modified_context, QueryProcessingStage::Complete, query_info.query);
if (processed_stage <= storage_stage)
{
@ -295,7 +304,7 @@ Pipes StorageMerge::createSources(const SelectQueryInfo & query_info, const Quer
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, *modified_context, modified_query_info.query, pipe, processed_stage);
convertingSourceStream(header, metadata_snapshot, *modified_context, modified_query_info.query, pipe, processed_stage);
pipe.addTableLock(struct_lock);
pipe.addInterpreterContext(modified_context);
@ -430,8 +439,13 @@ Block StorageMerge::getQueryHeader(
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
Pipe & pipe, QueryProcessingStage::Enum processed_stage)
void StorageMerge::convertingSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
const Context & context,
ASTPtr & query,
Pipe & pipe,
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = pipe.getHeader();
pipe.addSimpleTransform(std::make_shared<ConvertingTransform>(before_block_header, header, ConvertingTransform::MatchColumnsMode::Name));
@ -450,7 +464,7 @@ void StorageMerge::convertingSourceStream(const Block & header, const Context &
/// So we need to throw exception.
if (!header_column.type->equals(*before_column.type.get()) && processed_stage > QueryProcessingStage::FetchColumns)
{
NamesAndTypesList source_columns = getSampleBlock().getNamesAndTypesList();
NamesAndTypesList source_columns = metadata_snapshot->getSampleBlock().getNamesAndTypesList();
auto virtual_column = *getVirtuals().tryGetByName("_table");
source_columns.emplace_back(NameAndTypePair{virtual_column.name, virtual_column.type});
auto syntax_result = SyntaxAnalyzer(context).analyze(where_expression, source_columns);

View File

@ -82,13 +82,21 @@ protected:
QueryProcessingStage::Enum processed_stage);
Pipes createSources(
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size, const Block & header, const StorageWithLockAndName & storage_with_lock,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
const Block & header,
const StorageWithLockAndName & storage_with_lock,
Names & real_column_names,
const std::shared_ptr<Context> & modified_context, size_t streams_num, bool has_table_virtual_column,
const std::shared_ptr<Context> & modified_context,
size_t streams_num,
bool has_table_virtual_column,
bool concat_streams = false);
void convertingSourceStream(const Block & header, const Context & context, ASTPtr & query,
void convertingSourceStream(
const Block & header, const StorageMetadataPtr & metadata_snapshot,
const Context & context, ASTPtr & query,
Pipe & pipe, QueryProcessingStage::Enum processed_stage);
};

View File

@ -198,9 +198,9 @@ std::optional<UInt64> StorageMergeTree::totalBytes() const
return getTotalActiveSizeInBytes();
}
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
return std::make_shared<MergeTreeBlockOutputStream>(*this, context.getSettingsRef().max_partitions_per_insert_block);
return std::make_shared<MergeTreeBlockOutputStream>(*this, metadata_snapshot, context.getSettingsRef().max_partitions_per_insert_block);
}
void StorageMergeTree::checkTableCanBeDropped() const
@ -1017,7 +1017,8 @@ bool StorageMergeTree::optimize(
return true;
}
void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context)
void StorageMergeTree::alterPartition(
const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const PartitionCommands & commands, const Context & context)
{
for (const PartitionCommand & command : commands)
{
@ -1085,7 +1086,7 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
break;
default:
IStorage::alterPartition(query, commands, context); // should throw an exception.
IStorage::alterPartition(query, metadata_snapshot, commands, context); // should throw an exception.
}
}
}
@ -1126,7 +1127,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, cons
}
void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & context)
void StorageMergeTree::attachPartition(
const ASTPtr & partition, bool attach_part, const Context & context)
{
// TODO: should get some locks to prevent race with 'alter … modify column'

View File

@ -55,7 +55,11 @@ public:
*/
bool optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) override;
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & context) override;
void alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & /* metadata_snapshot */,
const PartitionCommands & commands,
const Context & context) override;
void mutate(const MutationCommands & commands, const Context & context) override;

View File

@ -65,7 +65,7 @@ StorageMySQL::StorageMySQL(
Pipes StorageMySQL::read(
const Names & column_names_,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info_,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
@ -74,12 +74,17 @@ Pipes StorageMySQL::read(
{
check(column_names_);
String query = transformQueryForExternalDatabase(
query_info_, getColumns().getOrdinary(), IdentifierQuotingStyle::BackticksMySQL, remote_database_name, remote_table_name, context_);
query_info_,
metadata_snapshot->getColumns().getOrdinary(),
IdentifierQuotingStyle::BackticksMySQL,
remote_database_name,
remote_table_name,
context_);
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = getColumns().getPhysical(column_name);
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
sample_block.insert({ column_data.type, column_data.name });
}
@ -95,12 +100,15 @@ Pipes StorageMySQL::read(
class StorageMySQLBlockOutputStream : public IBlockOutputStream
{
public:
explicit StorageMySQLBlockOutputStream(const StorageMySQL & storage_,
explicit StorageMySQLBlockOutputStream(
const StorageMySQL & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const mysqlxx::PoolWithFailover::Entry & entry_,
const size_t & mysql_max_rows_to_insert)
: storage{storage_}
, metadata_snapshot{metadata_snapshot_}
, remote_database_name{remote_database_name_}
, remote_table_name{remote_table_name_}
, entry{entry_}
@ -108,7 +116,7 @@ public:
{
}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
@ -136,7 +144,7 @@ public:
sqlbuf << backQuoteMySQL(remote_database_name) << "." << backQuoteMySQL(remote_table_name);
sqlbuf << " (" << dumpNamesWithBackQuote(block) << ") VALUES ";
auto writer = FormatFactory::instance().getOutput("Values", sqlbuf, storage.getSampleBlock(), storage.global_context);
auto writer = FormatFactory::instance().getOutput("Values", sqlbuf, metadata_snapshot->getSampleBlock(), storage.global_context);
writer->write(block);
if (!storage.on_duplicate_clause.empty())
@ -192,6 +200,7 @@ public:
private:
const StorageMySQL & storage;
StorageMetadataPtr metadata_snapshot;
std::string remote_database_name;
std::string remote_table_name;
mysqlxx::PoolWithFailover::Entry entry;
@ -199,9 +208,9 @@ private:
};
BlockOutputStreamPtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageMySQL::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
return std::make_shared<StorageMySQLBlockOutputStream>(*this, remote_database_name, remote_table_name, pool.get(), context.getSettingsRef().mysql_max_rows_to_insert);
return std::make_shared<StorageMySQLBlockOutputStream>(*this, metadata_snapshot, remote_database_name, remote_table_name, pool.get(), context.getSettingsRef().mysql_max_rows_to_insert);
}
void registerStorageMySQL(StorageFactory & factory)

View File

@ -3449,7 +3449,7 @@ void StorageReplicatedMergeTree::assertNotReadonly() const
}
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
const auto storage_settings_ptr = getSettings();
assertNotReadonly();
@ -3457,8 +3457,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/,
const Settings & query_settings = context.getSettingsRef();
bool deduplicate = storage_settings_ptr->replicated_deduplication_window != 0 && query_settings.insert_deduplicate;
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this,
query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this, metadata_snapshot, query_settings.insert_quorum, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, deduplicate);
}
@ -3830,7 +3829,11 @@ void StorageReplicatedMergeTree::alter(
}
}
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
void StorageReplicatedMergeTree::alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context)
{
for (const PartitionCommand & command : commands)
{
@ -3846,7 +3849,7 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
break;
case PartitionCommand::ATTACH_PARTITION:
attachPartition(command.partition, command.part, query_context);
attachPartition(command.partition, metadata_snapshot, command.part, query_context);
break;
case PartitionCommand::MOVE_PARTITION:
{
@ -4014,7 +4017,7 @@ void StorageReplicatedMergeTree::truncate(const ASTPtr & query, const Context &
}
void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool attach_part, const Context & query_context)
void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool attach_part, const Context & query_context)
{
// TODO: should get some locks to prevent race with 'alter … modify column'
@ -4023,7 +4026,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false); /// TODO Allow to use quorum here.
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
String old_name = loaded_parts[i]->name;

View File

@ -105,7 +105,11 @@ public:
void alter(const AlterCommands & params, const Context & query_context, TableStructureWriteLockHolder & table_lock_holder) override;
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override;
void waitMutation(const String & znode_name, size_t mutations_sync) const;
@ -527,7 +531,7 @@ private:
// Partition helpers
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
void attachPartition(const ASTPtr & partition, bool part, const Context & query_context);
void attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & query_context);
void fetchPartition(const ASTPtr & partition, const String & from, const Context & query_context);

View File

@ -32,15 +32,18 @@ namespace ErrorCodes
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_);
SetOrJoinBlockOutputStream(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_);
Block getHeader() const override { return table.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageSetOrJoinBase & table;
StorageMetadataPtr metadata_snapshot;
String backup_path;
String backup_tmp_path;
String backup_file_name;
@ -50,14 +53,20 @@ private:
};
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
: table(table_),
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
backup_file_name(backup_file_name_),
backup_buf(backup_tmp_path + backup_file_name),
compressed_backup_buf(backup_buf),
backup_stream(compressed_backup_buf, 0, table.getSampleBlock())
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
StorageSetOrJoinBase & table_,
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
const String & backup_tmp_path_,
const String & backup_file_name_)
: table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
, backup_tmp_path(backup_tmp_path_)
, backup_file_name(backup_file_name_)
, backup_buf(backup_tmp_path + backup_file_name)
, compressed_backup_buf(backup_buf)
, backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
{
}
@ -81,10 +90,10 @@ void SetOrJoinBlockOutputStream::writeSuffix()
}
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/", toString(id) + ".bin");
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin");
}
@ -119,7 +128,8 @@ StorageSet::StorageSet(
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_},
set(std::make_shared<Set>(SizeLimits(), false, true))
{
Block header = getSampleBlock();
Block header = getInMemoryMetadataPtr()->getSampleBlock();
header = header.sortColumns();
set->setHeader(header);
@ -134,11 +144,12 @@ size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
void StorageSet::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
Poco::File(path).remove(true);
Poco::File(path).createDirectories();
Poco::File(path + "tmp/").createDirectories();
Block header = getSampleBlock();
Block header = metadata_snapshot->getSampleBlock();
header = header.sortColumns();
increment = 0;

View File

@ -155,15 +155,17 @@ private:
class StripeLogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_)
: storage(storage_), lock(storage.rwlock),
data_out_file(storage.table_path + "data.bin"),
data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)),
data_out(*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size),
index_out_file(storage.table_path + "index.mrk"),
index_out_compressed(storage.disk->writeFile(index_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)),
index_out(*index_out_compressed),
block_out(data_out, 0, storage.getSampleBlock(), false, &index_out, storage.disk->getFileSize(data_out_file))
explicit StripeLogBlockOutputStream(StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(storage.rwlock)
, data_out_file(storage.table_path + "data.bin")
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, data_out(*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size)
, index_out_file(storage.table_path + "index.mrk")
, index_out_compressed(storage.disk->writeFile(index_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, index_out(*index_out_compressed)
, block_out(data_out, 0, metadata_snapshot->getSampleBlock(), false, &index_out, storage.disk->getFileSize(data_out_file))
{
}
@ -179,7 +181,7 @@ public:
}
}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
@ -205,6 +207,7 @@ public:
private:
StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
String data_out_file;
@ -311,9 +314,9 @@ Pipes StorageStripeLog::read(
}
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageStripeLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<StripeLogBlockOutputStream>(*this);
return std::make_shared<StripeLogBlockOutputStream>(*this, metadata_snapshot);
}

View File

@ -109,8 +109,8 @@ private:
class TinyLogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit TinyLogBlockOutputStream(StorageTinyLog & storage_)
: storage(storage_), lock(storage_.rwlock)
explicit TinyLogBlockOutputStream(StorageTinyLog & storage_, const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_), metadata_snapshot(metadata_snapshot_), lock(storage_.rwlock)
{
}
@ -126,13 +126,14 @@ public:
}
}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageTinyLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_mutex> lock;
bool done = false;
@ -394,7 +395,7 @@ void StorageTinyLog::rename(const String & new_path_to_table_data, const Storage
Pipes StorageTinyLog::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -408,15 +409,15 @@ Pipes StorageTinyLog::read(
// When reading, we lock the entire storage, because we only have one file
// per column and can't modify it concurrently.
pipes.emplace_back(std::make_shared<TinyLogSource>(
max_block_size, Nested::collect(getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
max_block_size, Nested::collect(metadata_snapshot->getColumns().getAllPhysical().addTypes(column_names)), *this, context.getSettingsRef().max_read_buffer_size));
return pipes;
}
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageTinyLog::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<TinyLogBlockOutputStream>(*this);
return std::make_shared<TinyLogBlockOutputStream>(*this, metadata_snapshot);
}

View File

@ -186,10 +186,10 @@ Pipes IStorageURLBase::read(
return pipes;
}
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global,
uri, format_name, metadata_snapshot->getSampleBlock(), context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global),
chooseCompressionMethod(uri.toString(), compression_method));
}

View File

@ -30,7 +30,7 @@ public:
Pipes read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -39,7 +39,7 @@ public:
{
check(column_names);
Block sample_block = getSampleBlock();
Block sample_block = metadata_snapshot->getSampleBlock();
MutableColumns res_columns = sample_block.cloneEmptyColumns();
fillData(res_columns, context, query_info);

View File

@ -242,7 +242,7 @@ private:
Pipes StorageSystemColumns::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -255,7 +255,7 @@ Pipes StorageSystemColumns::read(
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = getSampleBlock();
Block sample_block = metadata_snapshot->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());

View File

@ -47,7 +47,7 @@ protected:
Pipes read(
const Names & /* column_names */,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -57,7 +57,7 @@ protected:
StoragesInfoStream stream(query_info, context);
/// Create the result.
Block block = getSampleBlock();
Block block = metadata_snapshot->getSampleBlock();
MutableColumns new_columns = block.cloneEmptyColumns();
while (StoragesInfo info = stream.next())

View File

@ -28,7 +28,7 @@ StorageSystemDisks::StorageSystemDisks(const std::string & name_)
Pipes StorageSystemDisks::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -63,7 +63,7 @@ Pipes StorageSystemDisks::read(
Chunk chunk(std::move(res_columns), num_rows);
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(getSampleBlock(), std::move(chunk)));
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), std::move(chunk)));
return pipes;
}

View File

@ -225,7 +225,7 @@ StoragesInfo StoragesInfoStream::next()
Pipes StorageSystemPartsBase::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -238,7 +238,7 @@ Pipes StorageSystemPartsBase::read(
/// Create the result.
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
MutableColumns res_columns = metadata_snapshot->getSampleBlock().cloneEmptyColumns();
if (has_state_column)
res_columns.push_back(ColumnString::create());
@ -247,7 +247,7 @@ Pipes StorageSystemPartsBase::read(
processNextStorage(res_columns, info, has_state_column);
}
Block header = getSampleBlock();
Block header = metadata_snapshot->getSampleBlock();
if (has_state_column)
header.insert(ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "_state"));

View File

@ -59,7 +59,7 @@ StorageSystemReplicas::StorageSystemReplicas(const std::string & name_)
Pipes StorageSystemReplicas::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -146,7 +146,7 @@ Pipes StorageSystemReplicas::read(
col_engine = filtered_block.getByName("engine").column;
}
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
MutableColumns res_columns = metadata_snapshot->getSampleBlock().cloneEmptyColumns();
for (size_t i = 0, size = col_database->size(); i < size; ++i)
{
@ -187,7 +187,7 @@ Pipes StorageSystemReplicas::read(
res_columns[col_num++]->insert(status.zookeeper_exception);
}
Block header = getSampleBlock();
Block header = metadata_snapshot->getSampleBlock();
Columns fin_columns;
fin_columns.reserve(res_columns.size());
@ -203,7 +203,7 @@ Pipes StorageSystemReplicas::read(
Chunk chunk(std::move(fin_columns), num_rows);
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(getSampleBlock(), std::move(chunk)));
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), std::move(chunk)));
return pipes;
}

View File

@ -32,7 +32,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const std::string & n
Pipes StorageSystemStoragePolicies::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -78,7 +78,7 @@ Pipes StorageSystemStoragePolicies::read(
Chunk chunk(std::move(res_columns), num_rows);
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(getSampleBlock(), std::move(chunk)));
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), std::move(chunk)));
return pipes;
}

View File

@ -448,7 +448,7 @@ private:
Pipes StorageSystemTables::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -461,7 +461,7 @@ Pipes StorageSystemTables::read(
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = getSampleBlock();
Block sample_block = metadata_snapshot->getSampleBlock();
Block res_block;
std::vector<UInt8> columns_mask(sample_block.columns());