dynamic subcolumns: add snapshot for storage

This commit is contained in:
Anton Popov 2021-07-09 06:15:41 +03:00
parent 693685f2c2
commit 3ed7f5a6cc
140 changed files with 707 additions and 564 deletions

View File

@ -479,12 +479,13 @@ void RemoteQueryExecutor::sendExternalTables()
{
SelectQueryInfo query_info;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot);
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, metadata_snapshot, query_info);
context, QueryProcessingStage::Complete, storage_snapshot, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, query_info, context,
storage_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (pipe.empty())

View File

@ -190,6 +190,45 @@ DataTypePtr getLeastCommonTypeForObject(const DataTypes & types)
return std::make_shared<DataTypeTuple>(tuple_types, tuple_names);
}
NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list)
{
NameSet res;
for (const auto & [name, type] : columns_list)
if (isObject(type))
res.insert(name);
return res;
}
NamesAndTypesList extendObjectColumns(const NamesAndTypesList & columns_list, const NameToTypeMap & object_types, bool with_subcolumns)
{
NamesAndTypesList result_columns;
for (const auto & column : columns_list)
{
auto it = object_types.find(column.name);
if (it != object_types.end())
{
const auto & object_type = it->second;
result_columns.emplace_back(column.name, object_type);
if (with_subcolumns)
{
for (const auto & subcolumn : object_type->getSubcolumnNames())
{
result_columns.emplace_back(column.name, subcolumn,
object_type, object_type->getSubcolumnType(subcolumn));
}
}
}
else
{
result_columns.push_back(column);
}
}
return result_columns;
}
void finalizeObjectColumns(MutableColumns & columns)
{
for (auto & column : columns)

View File

@ -7,6 +7,8 @@
namespace DB
{
using NameToTypeMap = std::unordered_map<String, DataTypePtr>;
size_t getNumberOfDimensions(const IDataType & type);
size_t getNumberOfDimensions(const IColumn & column);
DataTypePtr getBaseTypeOfArray(const DataTypePtr & type);
@ -15,6 +17,9 @@ DataTypePtr createArrayOfType(DataTypePtr type, size_t dimension);
DataTypePtr getDataTypeByColumn(const IColumn & column);
void convertObjectsToTuples(NamesAndTypesList & columns_list, Block & block, const NamesAndTypesList & extended_storage_columns);
DataTypePtr getLeastCommonTypeForObject(const DataTypes & types);
NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list);
NamesAndTypesList extendObjectColumns(const NamesAndTypesList & columns_list, const NameToTypeMap & object_types, bool with_subcolumns);
void finalizeObjectColumns(MutableColumns & columns);
}

View File

@ -32,6 +32,7 @@ BlockIO InterpreterOptimizeQuery::execute()
auto table_id = getContext()->resolveStorageID(ast, Context::ResolveOrdinary);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto storage_snapshot = table->getStorageSnapshot(metadata_snapshot);
// Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use.
Names column_names;
@ -46,7 +47,7 @@ BlockIO InterpreterOptimizeQuery::execute()
column_names.emplace_back(col->getColumnName());
}
table->check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
Names required_columns;
{
required_columns = metadata_snapshot->getColumnsRequiredForSortingKey();

View File

@ -139,7 +139,7 @@ String InterpreterSelectQuery::generateFilterActions(ActionsDAGPtr & actions, co
table_expr->children.push_back(table_expr->database_and_table_name);
/// Using separate expression analyzer to prevent any possible alias injection
auto syntax_result = TreeRewriter(context).analyzeSelect(query_ast, TreeRewriterResult({}, storage, metadata_snapshot));
auto syntax_result = TreeRewriter(context).analyzeSelect(query_ast, TreeRewriterResult({}, storage, storage_snapshot));
SelectQueryExpressionAnalyzer analyzer(query_ast, syntax_result, context, metadata_snapshot);
actions = analyzer.simpleSelectActions();
@ -328,6 +328,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
table_id = storage->getStorageID();
if (!metadata_snapshot)
metadata_snapshot = storage->getInMemoryMetadataPtr();
storage_snapshot = storage->getStorageSnapshot(metadata_snapshot);
}
if (has_input || !joined_tables.resolveTables())
@ -384,7 +386,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
syntax_analyzer_result = TreeRewriter(context).analyzeSelect(
query_ptr,
TreeRewriterResult(source_header.getNamesAndTypesList(), storage, metadata_snapshot),
TreeRewriterResult(source_header.getNamesAndTypesList(), storage, storage_snapshot),
options, joined_tables.tablesWithColumns(), required_result_column_names, table_join);
query_info.syntax_analyzer_result = syntax_analyzer_result;
@ -499,7 +501,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
source_header = storage->getSampleBlockForColumns(metadata_snapshot, required_columns);
source_header = storage_snapshot->getSampleBlockForColumns(required_columns);
}
/// Calculate structure of the result.
@ -613,7 +615,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (storage && !options.only_analyze)
{
from_stage = storage->getQueryProcessingStage(context, options.to_stage, metadata_snapshot, query_info);
from_stage = storage->getQueryProcessingStage(context, options.to_stage, storage_snapshot, query_info);
/// TODO how can we make IN index work if we cache parts before selecting a projection?
/// XXX Used for IN set index analysis. Is this a proper way?
@ -1678,7 +1680,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
}
auto syntax_result
= TreeRewriter(context).analyze(required_columns_all_expr, required_columns_after_prewhere, storage, metadata_snapshot);
= TreeRewriter(context).analyze(required_columns_all_expr, required_columns_after_prewhere, storage, storage_snapshot);
alias_actions = ExpressionAnalyzer(required_columns_all_expr, syntax_result, context).getActionsDAG(true);
/// The set of required columns could be added as a result of adding an action to calculate ALIAS.
@ -1948,7 +1950,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
storage->read(query_plan, required_columns, metadata_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
storage->read(query_plan, required_columns, storage_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
if (context->hasQueryContext() && !options.is_internal)
{
@ -1963,8 +1965,9 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Create step which reads from empty source if storage has no data.
if (!query_plan.isInitialized())
{
const auto & metadata = query_info.projection ? query_info.projection->desc->metadata : metadata_snapshot;
auto header = storage->getSampleBlockForColumns(metadata, required_columns);
/// TODO: fix.
// const auto & metadata = query_info.projection ? query_info.projection->desc->metadata : metadata_snapshot;
auto header = storage_snapshot->getSampleBlockForColumns(required_columns);
addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}

View File

@ -203,6 +203,7 @@ private:
Poco::Logger * log;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
};
}

View File

@ -732,7 +732,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
for (const String & column : stage.output_columns)
all_asts->children.push_back(std::make_shared<ASTIdentifier>(column));
auto syntax_result = TreeRewriter(context).analyze(all_asts, all_columns, storage, metadata_snapshot);
auto syntax_result = TreeRewriter(context).analyze(all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot));
if (context->hasQueryContext())
for (const auto & it : syntax_result->getScalars())
context->getQueryContext()->addScalar(it.first, it.second);

View File

@ -607,9 +607,8 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
if (!select_query)
throw Exception("Select analyze for not select asts.", ErrorCodes::LOGICAL_ERROR);
if (settings.optimize_functions_to_subcolumns && result.storage
&& result.storage->supportsSubcolumns() && result.metadata_snapshot)
optimizeFunctionsToSubcolumns(query, result.metadata_snapshot);
if (settings.optimize_functions_to_subcolumns && result.storage_snapshot && result.storage->supportsSubcolumns())
optimizeFunctionsToSubcolumns(query, result.storage_snapshot->metadata);
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_multiif);
@ -668,7 +667,7 @@ void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
/// Replace monotonous functions with its argument
if (settings.optimize_monotonous_functions_in_order_by)
optimizeMonotonousFunctionsInOrderBy(select_query, context, tables_with_columns,
result.metadata_snapshot ? result.metadata_snapshot->getSortingKeyColumns() : Names{});
result.storage_snapshot ? result.storage_snapshot->metadata->getSortingKeyColumns() : Names{});
/// Remove duplicate items from ORDER BY.
/// Execute it after all order by optimizations,

View File

@ -617,10 +617,10 @@ std::vector<const ASTFunction *> getWindowFunctions(ASTPtr & query, const ASTSel
TreeRewriterResult::TreeRewriterResult(
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
bool add_special)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, source_columns(source_columns_)
{
collectSourceColumns(add_special);
@ -638,7 +638,7 @@ void TreeRewriterResult::collectSourceColumns(bool add_special)
if (storage->supportsSubcolumns())
options.withSubcolumns();
auto columns_from_storage = storage->getColumns(metadata_snapshot, options);
auto columns_from_storage = storage_snapshot->getColumns(options);
if (source_columns.empty())
source_columns.swap(columns_from_storage);
@ -745,9 +745,9 @@ void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
/// If we have no information about columns sizes, choose a column of minimum size of its data type.
required.insert(ExpressionActions::getSmallestColumn(source_columns));
}
else if (is_select && metadata_snapshot && !columns_context.has_array_join)
else if (is_select && storage_snapshot && !columns_context.has_array_join)
{
const auto & partition_desc = metadata_snapshot->getPartitionKey();
const auto & partition_desc = storage_snapshot->metadata->getPartitionKey();
if (partition_desc.expression)
{
auto partition_source_columns = partition_desc.expression->getRequiredColumns();
@ -939,9 +939,9 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
result.required_source_columns_before_expanding_alias_columns = result.required_source_columns.getNames();
/// rewrite filters for select query, must go after getArrayJoinedColumns
if (settings.optimize_respect_aliases && result.metadata_snapshot)
if (settings.optimize_respect_aliases && result.storage_snapshot)
{
replaceAliasColumnsInQuery(query, result.metadata_snapshot->getColumns(), result.array_join_result_to_source, getContext());
replaceAliasColumnsInQuery(query, result.storage_snapshot->metadata->getColumns(), result.array_join_result_to_source, getContext());
result.collectUsedColumns(query, true);
}
@ -960,7 +960,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
ASTPtr & query,
const NamesAndTypesList & source_columns,
ConstStoragePtr storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
bool allow_aggregations,
bool allow_self_aliases) const
{
@ -969,7 +969,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
const auto & settings = getContext()->getSettingsRef();
TreeRewriterResult result(source_columns, storage, metadata_snapshot, false);
TreeRewriterResult result(source_columns, storage, storage_snapshot, false);
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);

View File

@ -19,11 +19,13 @@ struct SelectQueryOptions;
using Scalars = std::map<String, Block>;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<const StorageSnapshot>;
struct TreeRewriterResult
{
ConstStoragePtr storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<TableJoin> analyzed_join;
const ASTTablesInSelectQueryElement * ast_join = nullptr;
@ -76,7 +78,7 @@ struct TreeRewriterResult
TreeRewriterResult(
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage_ = {},
const StorageMetadataPtr & metadata_snapshot_ = {},
const StorageSnapshotPtr & storage_snapshot_ = {},
bool add_special = true);
void collectSourceColumns(bool add_special);
@ -109,7 +111,7 @@ public:
ASTPtr & query,
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage = {},
const StorageMetadataPtr & metadata_snapshot = {},
const StorageSnapshotPtr & storage_snapshot = {},
bool allow_aggregations = false,
bool allow_self_aliases = true) const;

View File

@ -80,9 +80,8 @@ bool removeJoin(ASTSelectQuery & select, TreeRewriterResult & rewriter_result, C
}
Block getHeaderForProcessingStage(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage)
@ -91,7 +90,7 @@ Block getHeaderForProcessingStage(
{
case QueryProcessingStage::FetchColumns:
{
Block header = storage.getSampleBlockForColumns(metadata_snapshot, column_names);
Block header = storage_snapshot->getSampleBlockForColumns(column_names);
if (query_info.prewhere_info)
{
@ -122,7 +121,7 @@ Block getHeaderForProcessingStage(
removeJoin(*query->as<ASTSelectQuery>(), new_rewriter_result, context);
auto stream = std::make_shared<OneBlockInputStream>(
storage.getSampleBlockForColumns(metadata_snapshot, column_names));
storage_snapshot->getSampleBlockForColumns(column_names));
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
}

View File

@ -10,8 +10,8 @@ namespace DB
{
class IStorage;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct StorageSnapshot;
using StorageSnapshotPtr = std::shared_ptr<const StorageSnapshot>;
struct SelectQueryInfo;
struct TreeRewriterResult;
class ASTSelectQuery;
@ -20,9 +20,8 @@ bool hasJoin(const ASTSelectQuery & select);
bool removeJoin(ASTSelectQuery & select, TreeRewriterResult & rewriter_result, ContextPtr context);
Block getHeaderForProcessingStage(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage);

View File

@ -74,6 +74,7 @@ ReadFromMergeTree::ReadFromMergeTree(
Names virt_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
StorageSnapshotPtr storage_snapshot_,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
ContextPtr context_,
@ -83,7 +84,7 @@ ReadFromMergeTree::ReadFromMergeTree(
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
data_.getSampleBlockForColumns(metadata_snapshot_, real_column_names_),
storage_snapshot_->getSampleBlockForColumns(real_column_names_),
getPrewhereInfo(query_info_),
data_.getPartitionValueType(),
virt_column_names_)})
@ -95,6 +96,7 @@ ReadFromMergeTree::ReadFromMergeTree(
, query_info(query_info_)
, prewhere_info(getPrewhereInfo(query_info))
, actions_settings(ExpressionActionsSettings::fromContext(context_))
, storage_snapshot(std::move(storage_snapshot_))
, metadata_snapshot(std::move(metadata_snapshot_))
, metadata_snapshot_base(std::move(metadata_snapshot_base_))
, context(std::move(context_))
@ -141,7 +143,7 @@ Pipe ReadFromMergeTree::readFromPool(
min_marks_for_concurrent_read,
std::move(parts_with_range),
data,
metadata_snapshot,
storage_snapshot,
prewhere_info,
true,
required_columns,
@ -157,7 +159,7 @@ Pipe ReadFromMergeTree::readFromPool(
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
data, storage_snapshot, use_uncompressed_cache,
prewhere_info, actions_settings, reader_settings, virt_column_names);
if (i == 0)
@ -179,7 +181,7 @@ ProcessorPtr ReadFromMergeTree::createSource(
bool use_uncompressed_cache)
{
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
data, storage_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache,
prewhere_info, actions_settings, true, reader_settings, virt_column_names, part.part_index_in_query);
}
@ -784,7 +786,7 @@ ReadFromMergeTree::AnalysisResult ReadFromMergeTree::selectRangesToRead(MergeTre
result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
}
data.check(metadata_snapshot, result.column_names_to_read);
storage_snapshot->check(result.column_names_to_read);
// Build and check if primary key is used when necessary
const auto & primary_key = metadata_snapshot->getPrimaryKey();

View File

@ -60,6 +60,7 @@ public:
Names virt_column_names_,
const MergeTreeData & data_,
const SelectQueryInfo & query_info_,
StorageSnapshotPtr storage_snapshot,
StorageMetadataPtr metadata_snapshot_,
StorageMetadataPtr metadata_snapshot_base_,
ContextPtr context_,
@ -92,6 +93,7 @@ private:
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
StorageSnapshotPtr storage_snapshot;
StorageMetadataPtr metadata_snapshot;
StorageMetadataPtr metadata_snapshot_base;

View File

@ -274,7 +274,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
Pipe StorageHDFS::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
@ -309,7 +309,7 @@ Pipe StorageHDFS::read(
for (size_t i = 0; i < num_streams; ++i)
pipes.emplace_back(std::make_shared<HDFSSource>(
sources_info, uri_without_path, format_name, compression_method, metadata_snapshot->getSampleBlock(), context_, max_block_size));
sources_info, uri_without_path, format_name, compression_method, storage_snapshot->metadata->getSampleBlock(), context_, max_block_size));
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -25,7 +25,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -90,7 +90,7 @@ TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, cons
Pipe IStorage::read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
@ -103,18 +103,19 @@ Pipe IStorage::read(
void IStorage::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
auto pipe = read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
const auto & metadata_for_query = query_info.projection ? query_info.projection->desc->metadata : metadata_snapshot;
auto header = getSampleBlockForColumns(metadata_for_query, column_names);
/// TODO: fix
// const auto & metadata_for_query = query_info.projection ? query_info.projection->desc->metadata : storage_snapshot->metadata;
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
}
else
@ -206,90 +207,11 @@ NameDependencies IStorage::getDependentViewsByColumn(ContextPtr context) const
return name_deps;
}
NamesAndTypesList IStorage::getColumns(const StorageMetadataPtr & metadata_snapshot, const GetColumnsOptions & options) const
StorageSnapshotPtr IStorage::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const
{
auto all_columns = metadata_snapshot->getColumns().get(options);
if (options.with_virtuals)
{
/// Virtual columns must be appended after ordinary,
/// because user can override them.
auto virtuals = getVirtuals();
if (!virtuals.empty())
{
NameSet column_names;
for (const auto & column : all_columns)
column_names.insert(column.name);
for (auto && column : virtuals)
if (!column_names.count(column.name))
all_columns.push_back(std::move(column));
}
}
if (options.with_extended_objects)
all_columns = extendObjectColumns(all_columns, options.with_subcolumns);
return all_columns;
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot);
}
Block IStorage::getSampleBlockForColumns(const StorageMetadataPtr & metadata_snapshot, const Names & column_names) const
{
Block res;
auto all_columns = getColumns(
metadata_snapshot,
GetColumnsOptions(GetColumnsOptions::All)
.withSubcolumns().withVirtuals().withExtendedObjects());
std::unordered_map<String, DataTypePtr> columns_map;
columns_map.reserve(all_columns.size());
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
res.insert({it->second->createColumn(), it->second, it->first});
else
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Column {} not found in table {}", backQuote(name), getStorageID().getNameForLogs());
}
return res;
}
void IStorage::check(const StorageMetadataPtr & metadata_snapshot, const Names & column_names) const
{
auto available_columns = getColumns(
metadata_snapshot,
GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withSubcolumns().withVirtuals().withExtendedObjects()).getNames();
if (column_names.empty())
throw Exception(ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED,
"Empty list of columns queried. There are columns: {}",
boost::algorithm::join(available_columns, ","));
std::unordered_set<std::string_view> columns_set(available_columns.begin(), available_columns.end());
std::unordered_set<std::string_view> unique_names;
for (const auto & name : column_names)
{
if (columns_set.end() == columns_set.find(name))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE,
"There is no column with name {} in table {}. There are columns: ",
backQuote(name), getStorageID().getNameForLogs(), boost::algorithm::join(available_columns, ","));
if (unique_names.end() != unique_names.find(name))
throw Exception(ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE, "Column {} queried more than once", name);
unique_names.insert(name);
}
}
std::string PrewhereInfo::dump() const
{
WriteBufferFromOwnString ss;

View File

@ -14,6 +14,7 @@
#include <Storages/SelectQueryDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/TableLockHolder.h>
#include <Storages/StorageSnapshot.h>
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
@ -160,16 +161,6 @@ public:
/// used without any locks.
StorageMetadataPtr getInMemoryMetadataPtr() const { return metadata.get(); }
NamesAndTypesList getColumns(const StorageMetadataPtr & metadata_snapshot, const GetColumnsOptions & options) const;
/// Block with ordinary + materialized + aliases + virtuals + subcolumns.
/// message.
Block getSampleBlockForColumns(const StorageMetadataPtr & metadata_snapshot, const Names & column_names) const;
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.
void check(const StorageMetadataPtr & metadata_snapshot, const Names & column_names) const;
/// Update storage metadata. Used in ALTER or initialization of Storage.
/// Metadata object is multiversion, so this method can be called without
/// any locks.
@ -246,8 +237,7 @@ public:
* QueryProcessingStage::Enum required for Distributed over Distributed,
* since it cannot return Complete for intermediate queries never.
*/
virtual QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const
virtual QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
return QueryProcessingStage::FetchColumns;
}
@ -304,7 +294,7 @@ public:
*/
virtual Pipe read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
@ -316,7 +306,7 @@ public:
virtual void read(
QueryPlan & query_plan,
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
@ -564,6 +554,8 @@ public:
virtual NamesAndTypesList extendObjectColumns(const NamesAndTypesList & columns_list, bool /*with_subcolumns*/) const { return columns_list; }
virtual StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const;
private:
/// Lock required for alter queries (lockForAlter). Always taken for write
/// (actually can be replaced with std::mutex, but for consistency we use

View File

@ -21,21 +21,21 @@ const auto MAX_FAILED_POLL_ATTEMPTS = 10;
KafkaBlockInputStream::KafkaBlockInputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const std::shared_ptr<Context> & context_,
const Names & columns,
Poco::Logger * log_,
size_t max_block_size_,
bool commit_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, context(context_)
, column_names(columns)
, log(log_)
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns(metadata_snapshot, storage.getVirtualColumnNames()))
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames()))
, handle_error_mode(storage.getHandleKafkaErrorMode())
{
}
@ -53,7 +53,7 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(metadata_snapshot, column_names);
return storage_snapshot->getSampleBlockForColumns(column_names);
}
void KafkaBlockInputStream::readPrefixImpl()

View File

@ -18,7 +18,7 @@ class KafkaBlockInputStream : public IBlockInputStream
public:
KafkaBlockInputStream(
StorageKafka & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const std::shared_ptr<Context> & context_,
const Names & columns,
Poco::Logger * log_,
@ -38,7 +38,7 @@ public:
private:
StorageKafka & storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
ContextPtr context;
Names column_names;
Poco::Logger * log;

View File

@ -258,7 +258,7 @@ String StorageKafka::getDefaultClientId(const StorageID & table_id_)
Pipe StorageKafka::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /* query_info */,
ContextPtr local_context,
QueryProcessingStage::Enum /* processed_stage */,
@ -281,7 +281,8 @@ Pipe StorageKafka::read(
/// TODO: probably that leads to awful performance.
/// FIXME: seems that doesn't help with extra reading and committing unprocessed messages.
/// TODO: rewrite KafkaBlockInputStream to KafkaSource. Now it is used in other place.
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, modified_context, column_names, log, 1)));
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::make_shared<KafkaBlockInputStream>(
*this, storage_snapshot, modified_context, column_names, log, 1)));
}
LOG_DEBUG(log, "Starting reading {} streams", pipes.size());
@ -592,7 +593,8 @@ bool StorageKafka::streamToViews()
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
@ -616,7 +618,8 @@ bool StorageKafka::streamToViews()
streams.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)
{
auto stream = std::make_shared<KafkaBlockInputStream>(*this, metadata_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
auto stream = std::make_shared<KafkaBlockInputStream>(
*this, storage_snapshot, kafka_context, block_io.out->getHeader().getNames(), log, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL

View File

@ -43,7 +43,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -34,14 +34,14 @@ public:
bool supportsFinal() const override { return true; }
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override
{
return to_stage;
}
Pipe read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,

View File

@ -497,7 +497,7 @@ void StorageLiveView::refresh(bool grab_lock)
Pipe StorageLiveView::read(
const Names & /*column_names*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,

View File

@ -146,7 +146,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -25,7 +25,7 @@ namespace ErrorCodes
MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
UInt64 max_block_size_rows_,
@ -36,7 +36,7 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const Names & virt_column_names_)
: SourceWithProgress(transformHeader(std::move(header), prewhere_info_, storage_.getPartitionValueType(), virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, prewhere_info(prewhere_info_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)

View File

@ -22,7 +22,7 @@ public:
MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
UInt64 max_block_size_rows_,
@ -57,7 +57,7 @@ protected:
protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
PrewhereInfoPtr prewhere_info;
std::unique_ptr<PrewhereExprInfo> prewhere_actions;

View File

@ -265,7 +265,7 @@ void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Colum
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,
@ -275,7 +275,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
Names pre_column_names;
/// inject columns required for defaults evaluation
bool should_reorder = !injectRequiredColumns(storage, metadata_snapshot, data_part, column_names).empty();
bool should_reorder = !injectRequiredColumns(storage, storage_snapshot->metadata, data_part, column_names).empty();
if (prewhere_info)
{
@ -300,7 +300,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (pre_column_names.empty())
pre_column_names.push_back(column_names[0]);
const auto injected_pre_columns = injectRequiredColumns(storage, metadata_snapshot, data_part, pre_column_names);
const auto injected_pre_columns = injectRequiredColumns(storage, storage_snapshot->metadata, data_part, pre_column_names);
if (!injected_pre_columns.empty())
should_reorder = true;
@ -318,7 +318,7 @@ MergeTreeReadTaskColumns getReadTaskColumns(
if (check_columns)
{
auto all_columns = storage.getColumns(metadata_snapshot,
auto all_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns().withExtendedObjects());
result.pre_columns = all_columns.addTypes(pre_column_names);

View File

@ -75,7 +75,7 @@ struct MergeTreeReadTaskColumns
MergeTreeReadTaskColumns getReadTaskColumns(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const MergeTreeData::DataPartPtr & data_part,
const Names & required_columns,
const PrewhereInfoPtr & prewhere_info,

View File

@ -3861,7 +3861,7 @@ using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
static void selectBestProjection(
const MergeTreeDataSelectExecutor & reader,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
const Names & required_columns,
ProjectionCandidate & candidate,
@ -3890,7 +3890,8 @@ static void selectBestProjection(
auto sum_marks = reader.estimateNumMarksToRead(
projection_parts,
candidate.required_columns,
metadata_snapshot,
storage_snapshot,
storage_snapshot->metadata,
candidate.desc->metadata,
query_info, // TODO syntax_analysis_result set in index
query_context,
@ -3908,8 +3909,9 @@ static void selectBestProjection(
sum_marks += reader.estimateNumMarksToRead(
normal_parts,
required_columns,
metadata_snapshot,
metadata_snapshot,
storage_snapshot,
storage_snapshot->metadata,
storage_snapshot->metadata,
query_info, // TODO syntax_analysis_result set in index
query_context,
settings.max_threads,
@ -3926,8 +3928,9 @@ static void selectBestProjection(
bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const
ContextPtr query_context, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const
{
const auto & metadata_snapshot = storage_snapshot->metadata;
const auto & settings = query_context->getSettingsRef();
if (!settings.allow_experimental_projection_optimization || query_info.ignore_projections)
return false;
@ -4160,7 +4163,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
{
selectBestProjection(
reader,
metadata_snapshot,
storage_snapshot,
query_info,
analysis_result.required_columns,
candidate,
@ -4181,6 +4184,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
min_sum_marks = reader.estimateNumMarksToRead(
parts,
analysis_result.required_columns,
storage_snapshot,
metadata_snapshot,
metadata_snapshot,
query_info, // TODO syntax_analysis_result set in index
@ -4198,7 +4202,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
{
selectBestProjection(
reader,
metadata_snapshot,
storage_snapshot,
query_info,
analysis_result.required_columns,
candidate,
@ -4232,12 +4236,12 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info) const
{
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
{
if (getQueryProcessingStageWithAggregateProjection(query_context, metadata_snapshot, query_info))
if (getQueryProcessingStageWithAggregateProjection(query_context, storage_snapshot, query_info))
{
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
return QueryProcessingStage::Enum::WithMergeableState;
@ -5101,27 +5105,13 @@ ReservationPtr MergeTreeData::balancedReservation(
return reserved_space;
}
static NameSet getNamesOfObjectColumns(const NamesAndTypesList & columns_list)
{
NameSet res;
for (const auto & [name, type] : columns_list)
if (isObject(type))
res.insert(name);
return res;
}
static NamesAndTypesList extendObjectColumnsImpl(
const MergeTreeData::DataPartsVector & parts,
const NamesAndTypesList & columns_list,
const NameSet & requested_to_extend,
bool with_subcolumns)
StorageSnapshot::NameToTypeMap MergeTreeData::getObjectTypes(const DataPartsVector & parts, const NameSet & object_names)
{
std::unordered_map<String, DataTypes> types_in_parts;
if (parts.empty())
{
for (const auto & name : requested_to_extend)
for (const auto & name : object_names)
types_in_parts[name].push_back(std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeUInt8>()},
Names{ColumnObject::COLUMN_NAME_DUMMY}));
@ -5133,57 +5123,27 @@ static NamesAndTypesList extendObjectColumnsImpl(
const auto & part_columns = part->getColumns();
for (const auto & [name, type] : part_columns)
{
if (requested_to_extend.count(name))
if (object_names.count(name))
types_in_parts[name].push_back(type);
}
}
}
NamesAndTypesList result_columns;
for (const auto & column : columns_list)
{
auto it = types_in_parts.find(column.name);
if (it != types_in_parts.end())
{
auto expanded_type = getLeastCommonTypeForObject(it->second);
result_columns.emplace_back(column.name, expanded_type);
StorageSnapshot::NameToTypeMap object_types;
for (const auto & [name, types] : types_in_parts)
object_types.emplace(name, getLeastCommonTypeForObject(types));
if (with_subcolumns)
{
for (const auto & subcolumn : expanded_type->getSubcolumnNames())
{
result_columns.emplace_back(column.name, subcolumn,
expanded_type, expanded_type->getSubcolumnType(subcolumn));
}
}
}
else
{
result_columns.push_back(column);
}
}
return result_columns;
return object_types;
}
NamesAndTypesList MergeTreeData::extendObjectColumns(const DataPartsVector & parts, const NamesAndTypesList & columns_list, bool with_subcolumns)
StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const
{
/// Firstly fast check without locking parts, if there are any Object columns.
NameSet requested_to_extend = getNamesOfObjectColumns(columns_list);
if (requested_to_extend.empty())
return columns_list;
auto parts = getDataPartsVector();
auto object_types = getObjectTypes(
parts,
getNamesOfObjectColumns(metadata_snapshot->getColumns().getAll()));
return extendObjectColumnsImpl(parts, columns_list, requested_to_extend, with_subcolumns);
}
NamesAndTypesList MergeTreeData::extendObjectColumns(const NamesAndTypesList & columns_list, bool with_subcolumns) const
{
/// Firstly fast check without locking parts, if there are any Object columns.
NameSet requested_to_extend = getNamesOfObjectColumns(columns_list);
if (requested_to_extend.empty())
return columns_list;
return extendObjectColumnsImpl(getDataPartsVector(), columns_list, requested_to_extend, with_subcolumns);
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_types, parts);
}
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()

View File

@ -363,12 +363,12 @@ public:
BrokenPartCallback broken_part_callback_ = [](const String &){});
bool getQueryProcessingStageWithAggregateProjection(
ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info) const;
ContextPtr query_context, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const;
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & info) const override;
ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;
@ -394,6 +394,8 @@ public:
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, ContextPtr, const StorageMetadataPtr & metadata_snapshot) const override;
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const override;
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks);
@ -635,8 +637,7 @@ public:
return column_sizes;
}
NamesAndTypesList extendObjectColumns(const NamesAndTypesList & columns_list, bool with_subcolumns) const override;
static NamesAndTypesList extendObjectColumns(const DataPartsVector & parts, const NamesAndTypesList & columns_list, bool with_subcolumns);
static StorageSnapshot::NameToTypeMap getObjectTypes(const DataPartsVector & parts, const NameSet & object_names);
/// For ATTACH/DETACH/DROP PARTITION.
String getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr context) const;

View File

@ -15,6 +15,7 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/ColumnGathererStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataTypes/ObjectUtils.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
@ -718,7 +719,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
Names all_column_names = metadata_snapshot->getColumns().getNamesOfPhysical();
NamesAndTypesList storage_columns = metadata_snapshot->getColumns().getAllPhysical();
storage_columns = MergeTreeData::extendObjectColumns(parts, storage_columns, false);
auto object_types = MergeTreeData::getObjectTypes(parts, getNamesOfObjectColumns(storage_columns));
auto storage_snapshot = std::make_shared<StorageSnapshot>(data, metadata_snapshot, object_types, parts);
storage_columns = extendObjectColumns(storage_columns, object_types, false);
const auto data_settings = data.getSettings();
NamesAndTypesList gathering_columns;
@ -858,8 +863,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (const auto & part : parts)
{
/// TODO: Fix
auto input = std::make_unique<MergeTreeSequentialSource>(
data, metadata_snapshot, part, merging_column_names, read_with_direct_io, true);
data, storage_snapshot, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
@ -1061,7 +1067,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
data, metadata_snapshot, parts[part_num], column_names, read_with_direct_io, true);
data, storage_snapshot, parts[part_num], column_names, read_with_direct_io, true);
column_part_source->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));

View File

@ -124,7 +124,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
QueryPlanPtr MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
const UInt64 max_block_size,
@ -133,12 +133,15 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read) const
{
const auto & settings = context->getSettingsRef();
auto parts = data.getDataPartsVector();
const auto & parts = storage_snapshot->parts;
const auto & metadata_snapshot = storage_snapshot->metadata;
if (!query_info.projection)
{
auto plan = readFromParts(
parts,
column_names_to_return,
storage_snapshot,
metadata_snapshot,
metadata_snapshot,
query_info,
@ -185,6 +188,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
auto plan = readFromParts(
projection_parts,
query_info.projection->required_columns,
storage_snapshot,
metadata_snapshot,
query_info.projection->desc->metadata,
query_info,
@ -1086,6 +1090,7 @@ static void selectColumnNames(
size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageSnapshotPtr & storage_snapshot,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -1116,7 +1121,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
}
data.check(metadata_snapshot, real_column_names);
storage_snapshot->check(real_column_names);
const auto & primary_key = metadata_snapshot->getPrimaryKey();
Names primary_key_columns = primary_key.column_names;
@ -1166,6 +1171,7 @@ size_t MergeTreeDataSelectExecutor::estimateNumMarksToRead(
QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageSnapshotPtr & storage_snapshot,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -1192,6 +1198,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::readFromParts(
virt_column_names,
data,
query_info,
storage_snapshot,
metadata_snapshot,
metadata_snapshot_base,
context,

View File

@ -37,7 +37,7 @@ public:
QueryPlanPtr read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,
UInt64 max_block_size,
@ -49,6 +49,7 @@ public:
QueryPlanPtr readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -63,7 +64,8 @@ public:
size_t estimateNumMarksToRead(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageSnapshotPtr & storage_snapshot,
const StorageMetadataPtr & storage_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr context,

View File

@ -273,7 +273,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(
{
Block & block = block_with_partition.block;
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
auto extended_storage_columns = data.getColumns(metadata_snapshot,
auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot);
auto extended_storage_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
convertObjectsToTuples(columns, block, extended_storage_columns);

View File

@ -23,7 +23,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_,
const Names & column_names_,
@ -33,7 +33,7 @@ MergeTreeReadPool::MergeTreeReadPool(
: backoff_settings{backoff_settings_}
, backoff_state{threads_}
, data{data_}
, metadata_snapshot{metadata_snapshot_}
, storage_snapshot{storage_snapshot_}
, column_names{column_names_}
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
@ -168,7 +168,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, cons
Block MergeTreeReadPool::getHeader() const
{
return data.getSampleBlockForColumns(metadata_snapshot, column_names);
return storage_snapshot->getSampleBlockForColumns(column_names);
}
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
@ -215,7 +215,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
const RangesInDataParts & parts, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = metadata_snapshot->getSampleBlock();
Block sample_block = storage_snapshot->metadata->getSampleBlock();
for (const auto i : collections::range(0, parts.size()))
{
@ -229,7 +229,7 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
per_part_sum_marks.push_back(sum_marks);
auto [required_columns, required_pre_columns, should_reorder] =
getReadTaskColumns(data, metadata_snapshot, part.data_part, column_names, prewhere_info, check_columns);
getReadTaskColumns(data, storage_snapshot, part.data_part, column_names, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & required_column_names = required_columns.getNames();

View File

@ -71,7 +71,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageSnapshotPtr & storage_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
@ -99,7 +99,7 @@ private:
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
const Names column_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
@ -31,8 +31,8 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
storage_.getSampleBlockForColumns(metadata_snapshot_, required_columns_),
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
storage_snapshot_->getSampleBlockForColumns(required_columns_),
storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -56,7 +56,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
ordered_names = header_without_virtual_columns.getNames();
task_columns = getReadTaskColumns(storage, metadata_snapshot, data_part, required_columns, prewhere_info, check_columns);
task_columns = getReadTaskColumns(storage, storage_snapshot_, data_part, required_columns, prewhere_info, check_columns);
/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
const auto & column_names = task_columns.columns.getNames();
@ -67,12 +67,12 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
owned_mark_cache = storage.getContext()->getMarkCache();
reader = data_part->getReader(task_columns.columns, metadata_snapshot,
reader = data_part->getReader(task_columns.columns, storage_snapshot_->metadata,
all_mark_ranges, owned_uncompressed_cache.get(),
owned_mark_cache.get(), reader_settings);
if (prewhere_info)
pre_reader = data_part->getReader(task_columns.pre_columns, metadata_snapshot, all_mark_ranges,
pre_reader = data_part->getReader(task_columns.pre_columns, storage_snapshot_->metadata, all_mark_ranges,
owned_uncompressed_cache.get(), owned_mark_cache.get(), reader_settings);
}
@ -96,7 +96,7 @@ try
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, storage_snapshot->metadata->getSampleBlock());
task = std::make_unique<MergeTreeReadTask>(
data_part, mark_ranges_for_task, part_index_in_query, ordered_names, column_name_set,

View File

@ -18,7 +18,7 @@ class MergeTreeReverseSelectProcessor : public MergeTreeBaseSelectProcessor
public:
MergeTreeReverseSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,

View File

@ -14,7 +14,7 @@ namespace ErrorCodes
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
@ -31,8 +31,8 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
storage_.getSampleBlockForColumns(metadata_snapshot_, required_columns_),
storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
storage_snapshot_->getSampleBlockForColumns(required_columns_),
storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
@ -69,9 +69,10 @@ try
is_first_task = false;
task_columns = getReadTaskColumns(
storage, metadata_snapshot, data_part,
storage, storage_snapshot, data_part,
required_columns, prewhere_info, check_columns);
const auto & metadata_snapshot = storage_snapshot->metadata;
auto size_predictor = (preferred_block_size_bytes == 0)
? nullptr
: std::make_unique<MergeTreeBlockSizePredictor>(data_part, ordered_names, metadata_snapshot->getSampleBlock());

View File

@ -18,7 +18,7 @@ class MergeTreeSelectProcessor : public MergeTreeBaseSelectProcessor
public:
MergeTreeSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,

View File

@ -11,15 +11,15 @@ namespace ErrorCodes
MergeTreeSequentialSource::MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
: SourceWithProgress(storage_.getSampleBlockForColumns(metadata_snapshot_, columns_to_read_))
: SourceWithProgress(storage_snapshot_->getSampleBlockForColumns(columns_to_read_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
@ -39,12 +39,13 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
addTotalRowsApprox(data_part->rows_count);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, metadata_snapshot, data_part, columns_to_read);
injectRequiredColumns(storage, storage_snapshot->metadata, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{
auto physical_columns = storage.getColumns(metadata_snapshot,
auto physical_columns = storage_snapshot->getColumns(
GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects());
columns_for_reader = physical_columns.addTypes(columns_to_read);
}
else
@ -61,7 +62,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
.save_marks_in_cache = false
};
reader = data_part->getReader(columns_for_reader, metadata_snapshot,
reader = data_part->getReader(columns_for_reader, storage_snapshot->metadata,
MarkRanges{MarkRange(0, data_part->getMarksCount())},
/* uncompressed_cache = */ nullptr, mark_cache.get(), reader_settings);
}

View File

@ -14,7 +14,7 @@ class MergeTreeSequentialSource : public SourceWithProgress
public:
MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
@ -35,7 +35,7 @@ protected:
private:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;

View File

@ -16,7 +16,7 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,
@ -24,7 +24,7 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
pool_->getHeader(), storage_, storage_snapshot_, prewhere_info_, std::move(actions_settings), max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
thread{thread_},
@ -65,6 +65,7 @@ bool MergeTreeThreadSelectBlockInputProcessor::getNewTask()
/// Allows pool to reduce number of threads in case of too slow reads.
auto profile_callback = [this](ReadBufferFromFileBase::ProfileInfo info_) { pool->profileFeedback(info_); };
const auto & metadata_snapshot = storage_snapshot->metadata;
if (!reader)
{

View File

@ -22,7 +22,7 @@ public:
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
ExpressionActionsSettings actions_settings,

View File

@ -24,7 +24,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -35,6 +35,7 @@ public:
QueryPlan query_plan = std::move(*MergeTreeDataSelectExecutor(storage).readFromParts(
{},
column_names,
storage_snapshot,
metadata_snapshot,
metadata_snapshot,
query_info,

View File

@ -24,7 +24,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -36,8 +36,9 @@ public:
.readFromParts(
parts,
column_names,
metadata_snapshot,
metadata_snapshot,
storage_snapshot,
storage_snapshot->metadata,
storage_snapshot->metadata,
query_info,
context,
max_block_size,

View File

@ -258,7 +258,7 @@ NamesAndTypesList StorageMaterializedPostgreSQL::getVirtuals() const
Pipe StorageMaterializedPostgreSQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum processed_stage,
@ -267,7 +267,7 @@ Pipe StorageMaterializedPostgreSQL::read(
{
auto materialized_table_lock = lockForShare(String(), context_->getSettingsRef().lock_acquire_timeout);
auto nested_table = getNested();
return readFinalFromNestedStorage(nested_table, column_names, metadata_snapshot,
return readFinalFromNestedStorage(nested_table, column_names,
query_info, context_, processed_stage, max_block_size, num_streams);
}

View File

@ -84,7 +84,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context_,
QueryProcessingStage::Enum processed_stage,

View File

@ -15,20 +15,20 @@ namespace DB
RabbitMQBlockInputStream::RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
bool ack_in_suffix_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, context(context_)
, column_names(columns)
, max_block_size(max_block_size_)
, ack_in_suffix(ack_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized())
, sample_block(non_virtual_header)
, virtual_header(storage.getSampleBlockForColumns(metadata_snapshot,
, virtual_header(storage_snapshot->getSampleBlockForColumns(
{"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"}))
{
for (const auto & column : virtual_header)

View File

@ -14,7 +14,7 @@ class RabbitMQBlockInputStream : public IBlockInputStream
public:
RabbitMQBlockInputStream(
StorageRabbitMQ & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
const Names & columns,
size_t max_block_size_,
@ -37,7 +37,7 @@ public:
private:
StorageRabbitMQ & storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
ContextPtr context;
Names column_names;
const size_t max_block_size;

View File

@ -595,7 +595,7 @@ void StorageRabbitMQ::unbindExchange()
Pipe StorageRabbitMQ::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /* query_info */,
ContextPtr local_context,
QueryProcessingStage::Enum /* processed_stage */,
@ -608,7 +608,7 @@ Pipe StorageRabbitMQ::read(
if (num_created_consumers == 0)
return {};
auto sample_block = getSampleBlockForColumns(metadata_snapshot, column_names);
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
auto modified_context = addSettings(local_context);
auto block_size = getMaxBlockSize();
@ -625,7 +625,7 @@ Pipe StorageRabbitMQ::read(
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto rabbit_stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, modified_context, column_names, block_size);
*this, storage_snapshot, modified_context, column_names, block_size);
auto converting_stream = std::make_shared<ConvertingBlockInputStream>(
rabbit_stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Name);
@ -903,9 +903,9 @@ bool StorageRabbitMQ::streamToViews()
InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true);
auto block_io = interpreter.execute();
auto metadata_snapshot = getInMemoryMetadataPtr();
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
auto column_names = block_io.out->getHeader().getNames();
auto sample_block = getSampleBlockForColumns(metadata_snapshot, column_names);
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
auto block_size = getMaxBlockSize();
@ -916,7 +916,7 @@ bool StorageRabbitMQ::streamToViews()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto stream = std::make_shared<RabbitMQBlockInputStream>(
*this, metadata_snapshot, rabbitmq_context, column_names, block_size, false);
*this, storage_snapshot, rabbitmq_context, column_names, block_size, false);
streams.emplace_back(stream);
// Limit read batch to maximum block size to allow DDL

View File

@ -43,7 +43,7 @@ public:
/// Always return virtual columns in addition to required columns
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -19,7 +19,6 @@ namespace DB
Pipe readFinalFromNestedStorage(
StoragePtr nested_storage,
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -28,7 +27,7 @@ Pipe readFinalFromNestedStorage(
{
NameSet column_names_set = NameSet(column_names.begin(), column_names.end());
auto lock = nested_storage->lockForShare(context->getCurrentQueryId(), context->getSettingsRef().lock_acquire_timeout);
const StorageMetadataPtr & nested_metadata = nested_storage->getInMemoryMetadataPtr();
const auto & nested_metadata = nested_storage->getInMemoryMetadataPtr();
Block nested_header = nested_metadata->getSampleBlock();
ColumnWithTypeAndName & sign_column = nested_header.getByPosition(nested_header.columns() - 2);
@ -64,7 +63,8 @@ Pipe readFinalFromNestedStorage(
expressions->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
}
Pipe pipe = nested_storage->read(require_columns_name, nested_metadata, query_info, context, processed_stage, max_block_size, num_streams);
auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata);
Pipe pipe = nested_storage->read(require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
pipe.addTableLock(lock);
if (!expressions->children.empty() && !pipe.empty())

View File

@ -16,7 +16,6 @@ namespace DB
Pipe readFinalFromNestedStorage(
StoragePtr nested_storage,
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -285,24 +285,24 @@ void StorageEmbeddedRocksDB::initDb()
Pipe StorageEmbeddedRocksDB::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
FieldVectorPtr keys;
bool all_scan = false;
auto primary_key_data_type = metadata_snapshot->getSampleBlock().getByName(primary_key).type;
auto primary_key_data_type = storage_snapshot->metadata->getSampleBlock().getByName(primary_key).type;
std::tie(keys, all_scan) = getFilterKeys(primary_key, primary_key_data_type, query_info);
if (all_scan)
{
auto reader = std::make_shared<EmbeddedRocksDBBlockInputStream>(
*this, metadata_snapshot, max_block_size);
*this, storage_snapshot->metadata, max_block_size);
return Pipe(std::make_shared<SourceFromInputStream>(reader));
}
else
@ -327,7 +327,7 @@ Pipe StorageEmbeddedRocksDB::read(
size_t end = num_keys * (thread_idx + 1) / num_threads;
pipes.emplace_back(std::make_shared<EmbeddedRocksDBSource>(
*this, metadata_snapshot, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
*this, storage_snapshot->metadata, keys, keys->begin() + begin, keys->begin() + end, max_block_size));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -27,7 +27,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -134,9 +134,9 @@ StorageBuffer::StorageBuffer(
class BufferSource : public SourceWithProgress
{
public:
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage, const StorageMetadataPtr & metadata_snapshot)
: SourceWithProgress(storage.getSampleBlockForColumns(metadata_snapshot, column_names_))
, column_names_and_types(metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(column_names_))
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageSnapshotPtr & storage_snapshot)
: SourceWithProgress(storage_snapshot->getSampleBlockForColumns(column_names_))
, column_names_and_types(storage_snapshot->metadata->getColumns().getAllWithSubcolumns().addTypes(column_names_))
, buffer(buffer_) {}
String getName() const override { return "Buffer"; }
@ -183,7 +183,7 @@ private:
QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
const StorageSnapshotPtr &,
SelectQueryInfo & query_info) const
{
if (destination_id)
@ -193,7 +193,8 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
if (destination.get() == this)
throw Exception("Destination table is myself. Read will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
return destination->getQueryProcessingStage(local_context, to_stage, destination->getInMemoryMetadataPtr(), query_info);
const auto & destination_metadata = destination->getInMemoryMetadataPtr();
return destination->getQueryProcessingStage(local_context, to_stage, destination->getStorageSnapshot(destination_metadata), query_info);
}
return QueryProcessingStage::FetchColumns;
@ -202,7 +203,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
Pipe StorageBuffer::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -210,7 +211,7 @@ Pipe StorageBuffer::read(
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));
@ -219,13 +220,15 @@ Pipe StorageBuffer::read(
void StorageBuffer::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
const auto & metadata_snapshot = storage_snapshot->metadata;
if (destination_id)
{
auto destination = DatabaseCatalog::instance().getTable(destination_id, local_context);
@ -236,6 +239,7 @@ void StorageBuffer::read(
auto destination_lock = destination->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr();
auto destination_snapshot = destination->getStorageSnapshot(destination_metadata_snapshot);
const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination_metadata_snapshot](const String& column_name)
{
@ -252,7 +256,7 @@ void StorageBuffer::read(
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
destination->read(
query_plan, column_names, destination_metadata_snapshot, query_info,
query_plan, column_names, destination_snapshot, query_info,
local_context, processed_stage, max_block_size, num_streams);
}
else
@ -287,7 +291,7 @@ void StorageBuffer::read(
else
{
destination->read(
query_plan, columns_intersection, destination_metadata_snapshot, query_info,
query_plan, columns_intersection, destination_snapshot, query_info,
local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
@ -344,7 +348,7 @@ void StorageBuffer::read(
Pipes pipes_from_buffers;
pipes_from_buffers.reserve(num_shards);
for (auto & buf : buffers)
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this, metadata_snapshot));
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, storage_snapshot));
pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
}

View File

@ -59,11 +59,11 @@ public:
std::string getName() const override { return "Buffer"; }
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -73,7 +73,7 @@ public:
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -160,7 +160,7 @@ void StorageDictionary::checkTableCanBeDetached() const
Pipe StorageDictionary::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & /*query_info*/,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,

View File

@ -26,7 +26,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -276,7 +276,7 @@ void replaceConstantExpressions(
ConstStoragePtr storage,
const StorageMetadataPtr & metadata_snapshot)
{
auto syntax_result = TreeRewriter(context).analyze(node, columns, storage, metadata_snapshot);
auto syntax_result = TreeRewriter(context).analyze(node, columns, storage, storage->getStorageSnapshot(metadata_snapshot));
Block block_with_constants = KeyCondition::getBlockWithConstants(node, syntax_result, context);
InDepthNodeVisitor<ReplacingConstantExpressionsMatcher, true> visitor(block_with_constants);
@ -485,7 +485,7 @@ StorageDistributed::StorageDistributed(
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info) const
{
const auto & settings = local_context->getSettingsRef();
@ -497,7 +497,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// (Anyway it will be calculated in the read())
if (getClusterQueriedNodes(settings, cluster) > 1 && settings.optimize_skip_unused_shards)
{
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, storage_snapshot->metadata, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
@ -561,7 +561,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
Pipe StorageDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -569,7 +569,7 @@ Pipe StorageDistributed::read(
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));
@ -578,7 +578,7 @@ Pipe StorageDistributed::read(
void StorageDistributed::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -605,7 +605,7 @@ void StorageDistributed::read(
const Scalars & scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", metadata_snapshot))
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", storage_snapshot->metadata))
has_virtual_shard_num_column = false;
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr

View File

@ -57,11 +57,11 @@ public:
bool isRemote() const override { return true; }
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -71,7 +71,7 @@ public:
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -177,7 +177,7 @@ StorageExternalDistributed::StorageExternalDistributed(
Pipe StorageExternalDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -189,7 +189,7 @@ Pipe StorageExternalDistributed::read(
{
pipes.emplace_back(shard->read(
column_names,
metadata_snapshot,
storage_snapshot,
query_info,
context,
processed_stage,

View File

@ -32,7 +32,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -271,26 +271,26 @@ public:
static Block getBlockForSource(
const StorageFilePtr & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
const ColumnsDescription & columns_description,
const FilesInfoPtr & files_info)
{
if (storage->isColumnOriented())
return storage->getSampleBlockForColumns(metadata_snapshot, columns_description.getNamesOfPhysical());
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
else
return getHeader(metadata_snapshot, files_info->need_path_column, files_info->need_file_column);
return getHeader(storage_snapshot->metadata, files_info->need_path_column, files_info->need_file_column);
}
StorageFileSource(
std::shared_ptr<StorageFile> storage_,
const StorageMetadataPtr & metadata_snapshot_,
const StorageSnapshotPtr & storage_snapshot_,
ContextPtr context_,
UInt64 max_block_size_,
FilesInfoPtr files_info_,
ColumnsDescription columns_description_)
: SourceWithProgress(getBlockForSource(storage_, metadata_snapshot_, columns_description_, files_info_))
: SourceWithProgress(getBlockForSource(storage_, storage_snapshot_, columns_description_, files_info_))
, storage(std::move(storage_))
, metadata_snapshot(metadata_snapshot_)
, storage_snapshot(storage_snapshot_)
, files_info(std::move(files_info_))
, columns_description(std::move(columns_description_))
, context(context_)
@ -379,8 +379,8 @@ public:
auto get_block_for_format = [&]() -> Block
{
if (storage->isColumnOriented())
return storage->getSampleBlockForColumns(metadata_snapshot, columns_description.getNamesOfPhysical());
return metadata_snapshot->getSampleBlock();
return storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
return storage_snapshot->metadata->getSampleBlock();
};
auto format = FormatFactory::instance().getInput(
@ -434,7 +434,7 @@ public:
private:
std::shared_ptr<StorageFile> storage;
StorageMetadataPtr metadata_snapshot;
StorageSnapshotPtr storage_snapshot;
FilesInfoPtr files_info;
String current_path;
Block sample_block;
@ -454,7 +454,7 @@ private:
Pipe StorageFile::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -469,7 +469,7 @@ Pipe StorageFile::read(
if (paths.size() == 1 && !fs::exists(paths[0]))
{
if (context->getSettingsRef().engine_file_empty_if_not_exists)
return Pipe(std::make_shared<NullSource>(getSampleBlockForColumns(metadata_snapshot, column_names)));
return Pipe(std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
else
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
@ -505,12 +505,12 @@ Pipe StorageFile::read(
{
if (isColumnOriented())
return ColumnsDescription{
getSampleBlockForColumns(metadata_snapshot, column_names).getNamesAndTypesList()};
storage_snapshot->getSampleBlockForColumns(column_names).getNamesAndTypesList()};
else
return metadata_snapshot->getColumns();
return storage_snapshot->metadata->getColumns();
};
pipes.emplace_back(std::make_shared<StorageFileSource>(
this_ptr, metadata_snapshot, context, max_block_size, files_info, get_columns_for_format()));
this_ptr, storage_snapshot, context, max_block_size, files_info, get_columns_for_format()));
}
return Pipe::unitePipes(std::move(pipes));

View File

@ -22,7 +22,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -479,19 +479,19 @@ void registerStorageGenerateRandom(StorageFactory & factory)
Pipe StorageGenerateRandom::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
Pipes pipes;
pipes.reserve(num_streams);
const ColumnsDescription & our_columns = metadata_snapshot->getColumns();
const ColumnsDescription & our_columns = storage_snapshot->metadata->getColumns();
Block block_header;
for (const auto & name : column_names)
{

View File

@ -17,7 +17,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -54,7 +54,7 @@ void StorageInput::setInputStream(BlockInputStreamPtr input_stream_)
Pipe StorageInput::read(
const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -68,7 +68,7 @@ Pipe StorageInput::read(
{
/// Send structure to the client.
query_context->initializeInput(shared_from_this());
return Pipe(std::make_shared<StorageInputSource>(query_context, metadata_snapshot->getSampleBlock()));
return Pipe(std::make_shared<StorageInputSource>(query_context, storage_snapshot->metadata->getSampleBlock()));
}
if (!input_stream)

View File

@ -19,7 +19,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -567,16 +567,16 @@ private:
// TODO: multiple stream read and index read
Pipe StorageJoin::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned /*num_streams*/)
{
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
Block source_sample_block = getSampleBlockForColumns(metadata_snapshot, column_names);
Block source_sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
return Pipe(std::make_shared<JoinSource>(join, rwlock, max_block_size, source_sample_block));
}

View File

@ -49,7 +49,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -648,19 +648,19 @@ static std::chrono::seconds getLockTimeout(ContextPtr context)
Pipe StorageLog::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams)
{
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
auto lock_timeout = getLockTimeout(context);
loadMarks(lock_timeout);
auto all_columns = metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(column_names);
auto all_columns = storage_snapshot->metadata->getColumns().getAllWithSubcolumns().addTypes(column_names);
all_columns = Nested::convertToSubcolumns(all_columns);
std::shared_lock lock(rwlock, lock_timeout);
@ -669,7 +669,7 @@ Pipe StorageLog::read(
Pipes pipes;
const Marks & marks = getMarksWithRealRowCount(metadata_snapshot);
const Marks & marks = getMarksWithRealRowCount(storage_snapshot->metadata);
size_t marks_size = marks.size();
if (num_streams > marks_size)

View File

@ -27,7 +27,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -38,7 +38,7 @@ StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_stora
Pipe StorageMaterializeMySQL::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & /*storage_snapshot*/,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -47,7 +47,7 @@ Pipe StorageMaterializeMySQL::read(
{
/// If the background synchronization thread has exception.
rethrowSyncExceptionIfNeed(database);
return readFinalFromNestedStorage(nested_storage, column_names, metadata_snapshot,
return readFinalFromNestedStorage(nested_storage, column_names,
query_info, context, processed_stage, max_block_size, num_streams);
}

View File

@ -25,7 +25,7 @@ public:
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const IDatabase * database_);
Pipe read(
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info,
const Names & column_names, const StorageSnapshotPtr & metadata_snapshot, SelectQueryInfo & query_info,
ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr) override { throwNotAllowed(); }

View File

@ -131,15 +131,16 @@ StorageMaterializedView::StorageMaterializedView(
QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
const StorageSnapshotPtr &,
SelectQueryInfo & query_info) const
{
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getInMemoryMetadataPtr(), query_info);
auto target_metadata = getTargetTable()->getInMemoryMetadataPtr();
return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getStorageSnapshot(target_metadata), query_info);
}
Pipe StorageMaterializedView::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -147,7 +148,7 @@ Pipe StorageMaterializedView::read(
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));
@ -156,7 +157,7 @@ Pipe StorageMaterializedView::read(
void StorageMaterializedView::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -166,15 +167,16 @@ void StorageMaterializedView::read(
auto storage = getTargetTable();
auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot);
if (query_info.order_optimizer)
query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);
storage->read(query_plan, column_names, target_metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
{
auto mv_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, local_context, processed_stage);
auto mv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
auto target_header = query_plan.getCurrentDataStream().header;
/// No need to convert columns that does not exists in MV

View File

@ -67,7 +67,7 @@ public:
void shutdown() override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
StoragePtr getTargetTable() const;
StoragePtr tryGetTargetTable() const;
@ -76,7 +76,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -86,7 +86,7 @@ public:
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -29,13 +29,12 @@ public:
MemorySource(
Names column_names_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
std::shared_ptr<const Blocks> data_,
std::shared_ptr<std::atomic<size_t>> parallel_execution_index_,
InitializerFunc initializer_func_ = {})
: SourceWithProgress(storage.getSampleBlockForColumns(metadata_snapshot, column_names_))
, column_names_and_types(metadata_snapshot->getColumns().getAllWithSubcolumns().addTypes(std::move(column_names_)))
: SourceWithProgress(storage_snapshot->getSampleBlockForColumns(column_names_))
, column_names_and_types(storage_snapshot->metadata->getColumns().getAllWithSubcolumns().addTypes(std::move(column_names_)))
, data(data_)
, parallel_execution_index(parallel_execution_index_)
, initializer_func(std::move(initializer_func_))
@ -178,14 +177,14 @@ StorageMemory::StorageMemory(
Pipe StorageMemory::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned num_streams)
{
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
if (delay_read_for_global_subqueries)
{
@ -199,8 +198,7 @@ Pipe StorageMemory::read(
return Pipe(std::make_shared<MemorySource>(
column_names,
*this,
metadata_snapshot,
storage_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[this](std::shared_ptr<const Blocks> & data_to_initialize)
@ -221,7 +219,7 @@ Pipe StorageMemory::read(
for (size_t stream = 0; stream < num_streams; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(column_names, *this, metadata_snapshot, current_data, parallel_execution_index));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, storage_snapshot, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));

View File

@ -32,7 +32,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -139,7 +139,7 @@ bool StorageMerge::mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, Cont
QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
const StorageSnapshotPtr &,
SelectQueryInfo & query_info) const
{
/// In case of JOIN the first stage (which includes JOIN)
@ -168,7 +168,8 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
++selected_table_size;
stage_in_source_tables = std::max(
stage_in_source_tables,
table->getQueryProcessingStage(local_context, to_stage, table->getInMemoryMetadataPtr(), query_info));
table->getQueryProcessingStage(local_context, to_stage,
table->getStorageSnapshot(table->getInMemoryMetadataPtr()), query_info));
}
iterator->next();
@ -181,7 +182,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(
Pipe StorageMerge::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -197,9 +198,9 @@ Pipe StorageMerge::read(
for (const auto & column_name : column_names)
{
if (column_name == "_database" && isVirtualColumn(column_name, metadata_snapshot))
if (column_name == "_database" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_database_virtual_column = true;
else if (column_name == "_table" && isVirtualColumn(column_name, metadata_snapshot))
else if (column_name == "_table" && isVirtualColumn(column_name, storage_snapshot->metadata))
has_table_virtual_column = true;
else
real_column_names.push_back(column_name);
@ -212,7 +213,7 @@ Pipe StorageMerge::read(
modified_context->setSetting("optimize_move_to_prewhere", Field{false});
/// What will be result structure depending on query processed stage in source tables?
Block header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, local_context, processed_stage);
Block header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
@ -284,7 +285,7 @@ Pipe StorageMerge::read(
if (processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty())
{
auto syntax_result = TreeRewriter(local_context).analyzeSelect(query_info.query, TreeRewriterResult({}, storage, storage_metadata_snapshot));
auto syntax_result = TreeRewriter(local_context).analyzeSelect(query_info.query, TreeRewriterResult({}, storage, storage->getStorageSnapshot(storage_metadata_snapshot)));
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
@ -314,13 +315,13 @@ Pipe StorageMerge::read(
}
syntax_result = TreeRewriter(local_context).analyze(required_columns_expr_list, storage_columns.getAllPhysical(),
storage, storage_metadata_snapshot);
storage, storage->getStorageSnapshot(storage_metadata_snapshot));
auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true);
required_columns = alias_actions->getRequiredColumns().getNames();
}
auto source_pipe = createSources(
storage_metadata_snapshot,
storage_snapshot,
query_info,
processed_stage,
max_block_size,
@ -347,7 +348,7 @@ Pipe StorageMerge::read(
}
Pipe StorageMerge::createSources(
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
@ -389,16 +390,16 @@ Pipe StorageMerge::createSources(
}
auto storage_stage
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, metadata_snapshot, modified_query_info);
= storage->getQueryProcessingStage(modified_context, QueryProcessingStage::Complete, storage_snapshot, modified_query_info);
if (processed_stage <= storage_stage)
{
/// If there are only virtual columns in query, you must request at least one other column.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(metadata_snapshot->getColumns().getAllPhysical()));
real_column_names.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()));
pipe = storage->read(
real_column_names,
metadata_snapshot,
storage_snapshot,
modified_query_info,
modified_context,
processed_stage,
@ -470,7 +471,7 @@ Pipe StorageMerge::createSources(
/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, metadata_snapshot, aliases, modified_context, modified_query_info.query, pipe, processed_stage);
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, modified_query_info.query, pipe, processed_stage);
pipe.addTableLock(struct_lock);
pipe.addStorageHolder(storage);

View File

@ -28,11 +28,11 @@ public:
bool supportsSubcolumns() const override { return true; }
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -108,7 +108,7 @@ protected:
using Aliases = std::vector<AliasData>;
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
const QueryProcessingStage::Enum & processed_stage,
UInt64 max_block_size,

View File

@ -180,20 +180,20 @@ StorageMergeTree::~StorageMergeTree()
void StorageMergeTree::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
{
if (auto plan = reader.read(column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage))
if (auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage))
query_plan = std::move(*plan);
}
Pipe StorageMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -201,7 +201,7 @@ Pipe StorageMergeTree::read(
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));

View File

@ -40,7 +40,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -50,7 +50,7 @@ public:
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -74,7 +74,7 @@ void StorageMongoDB::connectIfNotConnected()
Pipe StorageMongoDB::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
@ -83,12 +83,12 @@ Pipe StorageMongoDB::read(
{
connectIfNotConnected();
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
Block sample_block;
for (const String & column_name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
auto column_data = storage_snapshot->metadata->getColumns().getPhysical(column_name);
sample_block.insert({ column_data.type, column_data.name });
}

View File

@ -34,7 +34,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -72,17 +72,17 @@ StorageMySQL::StorageMySQL(
Pipe StorageMySQL::read(
const Names & column_names_,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info_,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_block_size*/,
unsigned)
{
check(metadata_snapshot, column_names_);
storage_snapshot->check(column_names_);
String query = transformQueryForExternalDatabase(
query_info_,
metadata_snapshot->getColumns().getOrdinary(),
storage_snapshot->metadata->getColumns().getOrdinary(),
IdentifierQuotingStyle::BackticksMySQL,
remote_database_name,
remote_table_name,
@ -91,7 +91,7 @@ Pipe StorageMySQL::read(
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
auto column_data = storage_snapshot->metadata->getColumns().getPhysical(column_name);
WhichDataType which(column_data.type);
/// Convert enum to string.

View File

@ -41,7 +41,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -23,7 +23,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo &,
ContextPtr /*context*/,
QueryProcessingStage::Enum /*processing_stage*/,
@ -31,7 +31,7 @@ public:
unsigned) override
{
return Pipe(
std::make_shared<NullSource>(getSampleBlockForColumns(metadata_snapshot, column_names)));
std::make_shared<NullSource>(storage_snapshot->getSampleBlockForColumns(column_names)));
}
bool supportsParallelInsert() const override { return true; }

View File

@ -64,25 +64,25 @@ StoragePostgreSQL::StoragePostgreSQL(
Pipe StoragePostgreSQL::read(
const Names & column_names_,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info_,
ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size_,
unsigned)
{
check(metadata_snapshot, column_names_);
storage_snapshot->check(column_names_);
/// Connection is already made to the needed database, so it should not be present in the query;
/// remote_table_schema is empty if it is not specified, will access only table_name.
String query = transformQueryForExternalDatabase(
query_info_, metadata_snapshot->getColumns().getOrdinary(),
query_info_, storage_snapshot->metadata->getColumns().getOrdinary(),
IdentifierQuotingStyle::DoubleQuotes, remote_table_schema, remote_table_name, context_);
Block sample_block;
for (const String & column_name : column_names_)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(column_name);
auto column_data = storage_snapshot->metadata->getColumns().getPhysical(column_name);
WhichDataType which(column_data.type);
if (which.isEnum())
column_data.type = std::make_shared<DataTypeString>();

View File

@ -34,7 +34,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -34,10 +34,11 @@ public:
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr context,
QueryProcessingStage::Enum to_stage,
const StorageMetadataPtr &,
const StorageSnapshotPtr &,
SelectQueryInfo & info) const override
{
return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getInMemoryMetadataPtr(), info);
const auto & nested_metadata = getNested()->getInMemoryMetadataPtr();
return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getStorageSnapshot(nested_metadata), info);
}
BlockInputStreams watch(
@ -53,14 +54,14 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
return getNested()->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
return getNested()->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) override

View File

@ -4447,7 +4447,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
void StorageReplicatedMergeTree::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -4463,18 +4463,18 @@ void StorageReplicatedMergeTree::read(
{
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
if (auto plan = reader.read(
column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, std::move(max_added_blocks)))
column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage, std::move(max_added_blocks)))
query_plan = std::move(*plan);
return;
}
if (auto plan = reader.read(column_names, metadata_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage))
if (auto plan = reader.read(column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams, processed_stage))
query_plan = std::move(*plan);
}
Pipe StorageReplicatedMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
@ -4482,7 +4482,7 @@ Pipe StorageReplicatedMergeTree::read(
const unsigned num_streams)
{
QueryPlan plan;
read(plan, column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
read(plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
return plan.convertToPipe(
QueryPlanOptimizationSettings::fromContext(local_context),
BuildQueryPipelineSettings::fromContext(local_context));

View File

@ -95,7 +95,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -105,7 +105,7 @@ public:
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -360,7 +360,7 @@ StorageS3::StorageS3(
Pipe StorageS3::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
@ -405,9 +405,9 @@ Pipe StorageS3::read(
need_file_column,
format_name,
getName(),
metadata_snapshot->getSampleBlock(),
storage_snapshot->metadata->getSampleBlock(),
local_context,
metadata_snapshot->getColumns(),
storage_snapshot->metadata->getColumns(),
max_block_size,
max_single_read_retries,
compression_method,

View File

@ -121,7 +121,7 @@ public:
Pipe read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,

View File

@ -80,7 +80,7 @@ StorageS3Cluster::StorageS3Cluster(
/// The code executes on initiator
Pipe StorageS3Cluster::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
@ -133,12 +133,12 @@ Pipe StorageS3Cluster::read(
}
}
check(metadata_snapshot, column_names);
storage_snapshot->check(column_names);
return Pipe::unitePipes(std::move(pipes));
}
QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageMetadataPtr &, SelectQueryInfo &) const
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)

View File

@ -27,11 +27,11 @@ class StorageS3Cluster : public shared_ptr_helper<StorageS3Cluster>, public ISto
public:
std::string getName() const override { return "S3Cluster"; }
Pipe read(const Names &, const StorageMetadataPtr &, SelectQueryInfo &,
Pipe read(const Names &, const StorageSnapshotPtr &, SelectQueryInfo &,
ContextPtr, QueryProcessingStage::Enum, size_t /*max_block_size*/, unsigned /*num_streams*/) override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageMetadataPtr &, SelectQueryInfo &) const override;
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
NamesAndTypesList getVirtuals() const override;

View File

@ -0,0 +1,95 @@
#include <Storages/StorageSnapshot.h>
#include <Storages/IStorage.h>
#include <DataTypes/ObjectUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
}
NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const
{
auto all_columns = metadata->getColumns().get(options);
if (options.with_virtuals)
{
/// Virtual columns must be appended after ordinary,
/// because user can override them.
auto virtuals = storage.getVirtuals();
if (!virtuals.empty())
{
NameSet column_names;
for (const auto & column : all_columns)
column_names.insert(column.name);
for (auto && column : virtuals)
if (!column_names.count(column.name))
all_columns.push_back(std::move(column));
}
}
if (options.with_extended_objects)
all_columns = extendObjectColumns(all_columns, object_types, options.with_subcolumns);
return all_columns;
}
Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
auto all_columns = getColumns(GetColumnsOptions(GetColumnsOptions::All)
.withSubcolumns().withVirtuals().withExtendedObjects());
std::unordered_map<String, DataTypePtr> columns_map;
columns_map.reserve(all_columns.size());
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
res.insert({it->second->createColumn(), it->second, it->first});
else
throw Exception(ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK,
"Column {} not found in table {}", backQuote(name), storage.getStorageID().getNameForLogs());
}
return res;
}
void StorageSnapshot::check(const Names & column_names) const
{
auto available_columns = getColumns(GetColumnsOptions(GetColumnsOptions::AllPhysical)
.withSubcolumns().withVirtuals().withExtendedObjects()).getNames();
if (column_names.empty())
throw Exception(ErrorCodes::EMPTY_LIST_OF_COLUMNS_QUERIED,
"Empty list of columns queried. There are columns: {}",
boost::algorithm::join(available_columns, ","));
std::unordered_set<std::string_view> columns_set(available_columns.begin(), available_columns.end());
std::unordered_set<std::string_view> unique_names;
for (const auto & name : column_names)
{
if (columns_set.end() == columns_set.find(name))
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE,
"There is no column with name {} in table {}. There are columns: ",
backQuote(name), storage.getStorageID().getNameForLogs(), boost::algorithm::join(available_columns, ","));
if (unique_names.end() != unique_names.find(name))
throw Exception(ErrorCodes::COLUMN_QUERIED_MORE_THAN_ONCE, "Column {} queried more than once", name);
unique_names.insert(name);
}
}
}

Some files were not shown because too many files have changed in this diff Show More