dbms: pass proper context into Storage::read(). [#METR-14179]

This commit is contained in:
Andrey Mironov 2014-12-12 17:30:35 +03:00
parent f92720b41f
commit fc9472ba47
56 changed files with 245 additions and 106 deletions

View File

@ -29,24 +29,28 @@ private:
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_),
external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
/// Принимает пул, из которого нужно будет достать соединение.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_)
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
const Context & context = Context{})
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_), context(context)
{
init(settings_);
}
@ -103,7 +107,8 @@ protected:
{
StoragePtr cur = table.second;
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), settings, stage, DEFAULT_BLOCK_SIZE, 1);
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
stage, DEFAULT_BLOCK_SIZE, 1);
if (input.size() == 0)
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
else
@ -246,6 +251,7 @@ private:
/// Временные таблицы, которые необходимо переслать на удаленные сервера.
Tables external_tables;
QueryProcessingStage::Enum stage;
Context context;
/// Отправили запрос (это делается перед получением первого блока).
bool sent_query = false;

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -20,6 +20,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -61,6 +61,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -40,6 +40,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -46,6 +47,7 @@ public:
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -60,6 +60,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -162,6 +162,7 @@ public:
virtual BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
@ -227,6 +228,7 @@ protected:
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -37,6 +37,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -83,6 +83,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -49,6 +49,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -70,6 +70,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,10 +33,11 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override
const size_t max_block_size = DEFAULT_BLOCK_SIZE,
const unsigned threads = 1) override
{
return { new NullBlockInputStream };
}

View File

@ -74,6 +74,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -26,6 +26,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -17,6 +17,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -28,6 +28,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -25,6 +25,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -27,6 +27,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -129,6 +129,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -33,6 +33,7 @@ public:
BlockInputStreams read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,

View File

@ -90,7 +90,12 @@ private:
NamesAndTypesList res;
/// Отправляем на первый попавшийся шард
BlockInputStreamPtr input = new RemoteBlockInputStream(&*cluster.pools.front(), query, &settings, Tables(), QueryProcessingStage::Complete);
BlockInputStreamPtr input{
new RemoteBlockInputStream{
cluster.pools.front().get(), query, &settings,
Tables(), QueryProcessingStage::Complete, context
}
};
input->readPrefix();
while (true)

View File

@ -32,6 +32,8 @@
#include <DB/Parsers/formatAST.h>
#include <statdaemons/ext/range.hpp>
namespace DB
{
@ -589,6 +591,9 @@ static SharedPtr<InterpreterSelectQuery> interpretSubquery(
if (!parse_res)
throw Exception("Error in parsing SELECT query while creating set or join for table " + table->name + ".",
ErrorCodes::LOGICAL_ERROR);
/// @note it may be more appropriate to manually replace ASTAsterisk with table's columns
ExpressionAnalyzer{query, context, subquery_depth};
}
else
query = subquery->children.at(0);
@ -1737,26 +1742,32 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
return;
auto & node = typeid_cast<ASTJoin &>(*select_query->join);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов.
size_t num_join_keys = keys.children.size();
for (size_t i = 0; i < num_join_keys; ++i)
Block nested_result_sample;
if (const auto identifier = typeid_cast<const ASTIdentifier *>(node.table.get()))
{
if (!join_key_names_left_set.insert(keys.children[i]->getColumnName()).second)
const auto & table = context.getTable("", identifier->name);
nested_result_sample = table->getSampleBlockNonMaterialized();
}
else if (typeid_cast<const ASTSubquery *>(node.table.get()))
{
const auto & table = node.table->children.at(0);
nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
}
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
for (const auto & key : keys.children)
{
if (!join_key_names_left_set.insert(key->getColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
if (!join_key_names_right_set.insert(keys.children[i]->getAliasOrColumnName()).second)
if (!join_key_names_right_set.insert(key->getAliasOrColumnName()).second)
throw Exception("Duplicate column in USING list", ErrorCodes::DUPLICATE_COLUMN);
}
Block nested_result_sample = ExpressionAnalyzer(table, context, subquery_depth + 1).getSelectSampleBlock();
size_t nested_result_columns = nested_result_sample.columns();
for (size_t i = 0; i < nested_result_columns; ++i)
for (const auto i : ext::range(0, nested_result_sample.columns()))
{
auto col = nested_result_sample.getByPosition(i);
const auto & col = nested_result_sample.getByPosition(i);
if (!join_key_names_right_set.count(col.name))
{
joined_columns.insert(col.name);

View File

@ -547,7 +547,9 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu
if (storage && storage->isRemote())
storage->storeExternalTables(query_analyzer->getExternalTables());
streams = storage->read(required_columns, query_ptr, settings_for_storage, from_stage, settings.max_block_size, settings.max_threads);
streams = storage->read(required_columns, query_ptr,
context, settings_for_storage, from_stage,
settings.max_block_size, settings.max_threads);
for (auto & stream : streams)
stream->addTableLock(table_lock);

View File

@ -38,10 +38,11 @@ static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & p
BlockInputStreams MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads,
const size_t max_block_size,
const unsigned threads,
size_t * part_index)
{
size_t part_index_var = 0;
@ -74,15 +75,15 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if (!virt_column_names.empty())
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, data.context);
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
data.check(real_column_names);
processed_stage = QueryProcessingStage::FetchColumns;
PKCondition key_condition(query, data.context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, data.context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription());
PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part.
{

View File

@ -90,6 +90,7 @@ private:
BlockInputStreams StorageBuffer::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
@ -101,7 +102,7 @@ BlockInputStreams StorageBuffer::read(
if (!no_destination)
streams_from_dst = context.getTable(destination_database, destination_table)->read(
column_names, query, settings, processed_stage, max_block_size, threads);
column_names, query, context, settings, processed_stage, max_block_size, threads);
BlockInputStreams streams_from_buffers;
streams_from_buffers.reserve(num_shards);

View File

@ -62,10 +62,11 @@ bool StorageChunkMerger::hasColumn(const String & column_name) const
BlockInputStreams StorageChunkMerger::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
/// Будем читать из таблиц Chunks, на которые есть хоть одна ChunkRef, подходящая под регэксп, и из прочих таблиц, подходящих под регэксп.
Storages selected_tables;
@ -76,16 +77,17 @@ BlockInputStreams StorageChunkMerger::read(
typedef std::set<std::string> StringSet;
StringSet chunks_table_names;
Databases & databases = context.getDatabases();
const Databases & databases = context.getDatabases();
if (!databases.count(source_database))
const auto database_it = databases.find(source_database);
if (database_it == std::end(databases))
throw Exception("No database " + source_database, ErrorCodes::UNKNOWN_DATABASE);
Tables & tables = databases[source_database];
for (Tables::iterator it = tables.begin(); it != tables.end(); ++it)
const Tables & tables = database_it->second;
for (const auto & it : tables)
{
StoragePtr table = it->second;
if (table_name_regexp.match(it->first) &&
const StoragePtr & table = it.second;
if (table_name_regexp.match(it.first) &&
!typeid_cast<StorageChunks *>(&*table) &&
!typeid_cast<StorageChunkMerger *>(&*table))
{
@ -93,19 +95,20 @@ BlockInputStreams StorageChunkMerger::read(
{
if (chunk_ref->source_database_name != source_database)
{
LOG_WARNING(log, "ChunkRef " + it->first + " points to another database, ignoring");
LOG_WARNING(log, "ChunkRef " + it.first + " points to another database, ignoring");
continue;
}
if (!chunks_table_names.count(chunk_ref->source_table_name))
{
if (tables.count(chunk_ref->source_table_name))
const auto table_it = tables.find(chunk_ref->source_table_name);
if (table_it != std::end(tables))
{
chunks_table_names.insert(chunk_ref->source_table_name);
selected_tables.push_back(tables[chunk_ref->source_table_name]);
selected_tables.push_back(table_it->second);
}
else
{
LOG_WARNING(log, "ChunkRef " + it->first + " points to non-existing Chunks table, ignoring");
LOG_WARNING(log, "ChunkRef " + it.first + " points to non-existing Chunks table, ignoring");
}
}
}
@ -174,6 +177,7 @@ BlockInputStreams StorageChunkMerger::read(
BlockInputStreams source_streams = table->read(
real_column_names,
modified_query_ast,
context,
settings,
tmp_processed_stage,
max_block_size,
@ -465,6 +469,7 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
BlockInputStreams input_streams = src_storage->read(
src_column_names,
select_query_ptr,
context,
settings,
processed_stage,
DEFAULT_MERGE_BLOCK_SIZE);

View File

@ -12,14 +12,17 @@ StoragePtr StorageChunkRef::create(const std::string & name_, const Context & co
}
BlockInputStreams StorageChunkRef::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
return getSource().readFromChunk(name, column_names, query, settings, processed_stage, max_block_size, threads);
return getSource().readFromChunk(name, column_names, query,
context, settings, processed_stage,
max_block_size, threads);
}
ASTPtr StorageChunkRef::getCustomCreateQuery(const Context & context) const

View File

@ -46,10 +46,11 @@ void StorageChunks::removeReference()
BlockInputStreams StorageChunks::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
bool has_virtual_column = false;
@ -59,17 +60,21 @@ BlockInputStreams StorageChunks::read(
/// Если виртуальных столбцов нет, просто считать данные из таблицы
if (!has_virtual_column)
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings,
processed_stage, max_block_size, threads);
Block virtual_columns_block = getBlockWithVirtualColumns();
if (!VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context))
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings,
processed_stage, max_block_size, threads);
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, _table_column_name);
BlockInputStreams res;
for (const auto & it : values)
{
BlockInputStreams temp = readFromChunk(it, column_names, query, settings, processed_stage, max_block_size, threads);
BlockInputStreams temp = readFromChunk(it, column_names, query, context, settings, processed_stage, max_block_size, threads);
res.insert(res.end(), temp.begin(), temp.end());
}
return res;
@ -92,10 +97,11 @@ BlockInputStreams StorageChunks::readFromChunk(
const std::string & chunk_name,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
size_t mark1;
size_t mark2;
@ -110,7 +116,7 @@ BlockInputStreams StorageChunks::readFromChunk(
mark2 = index + 1 == chunk_num_to_marks.size() ? marksCount() : chunk_num_to_marks[index + 1];
}
return read(mark1, mark2, column_names, query, settings, processed_stage, max_block_size, threads);
return read(mark1, mark2, column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageChunks::writeToNewChunk(

View File

@ -140,10 +140,11 @@ StoragePtr StorageDistributed::create(
BlockInputStreams StorageDistributed::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
@ -163,7 +164,8 @@ BlockInputStreams StorageDistributed::read(
for (auto & conn_pool : cluster.pools)
res.emplace_back(new RemoteBlockInputStream{
conn_pool, modified_query, &new_settings,
external_tables, processed_stage});
external_tables, processed_stage, context
});
/// Добавляем запросы к локальному ClickHouse.
if (cluster.getLocalNodesNum() > 0)

View File

@ -620,6 +620,7 @@ BlockInputStreams StorageLog::read(
size_t to_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
@ -693,12 +694,15 @@ BlockInputStreams StorageLog::read(
BlockInputStreams StorageLog::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return read(0, std::numeric_limits<size_t>::max(), column_names, query, settings, processed_stage, max_block_size, threads);
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings, processed_stage,
max_block_size, threads);
}

View File

@ -99,12 +99,13 @@ bool StorageMaterializedView::hasColumn(const String & column_name) const
BlockInputStreams StorageMaterializedView::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return data->read(column_names, query, settings, processed_stage, max_block_size, threads);
return data->read(column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageMaterializedView::write(ASTPtr query)

View File

@ -99,6 +99,7 @@ StoragePtr StorageMemory::create(
BlockInputStreams StorageMemory::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,

View File

@ -79,10 +79,11 @@ bool StorageMerge::hasColumn(const String & column_name) const
BlockInputStreams StorageMerge::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
BlockInputStreams res;
@ -143,6 +144,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams source_streams = table->read(
real_column_names,
modified_query_ast,
context,
settings,
tmp_processed_stage,
max_block_size,

View File

@ -90,12 +90,13 @@ StorageMergeTree::~StorageMergeTree()
BlockInputStreams StorageMergeTree::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
return reader.read(column_names, query, context, settings, processed_stage, max_block_size, threads);
}
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)

View File

@ -1990,12 +1990,13 @@ StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
BlockInputStreams StorageReplicatedMergeTree::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
Names virt_column_names;
Names real_column_names;
@ -2024,8 +2025,9 @@ BlockInputStreams StorageReplicatedMergeTree::read(
if (unreplicated_reader && values.count(0))
{
res = unreplicated_reader->read(
real_column_names, query, settings, processed_stage, max_block_size, threads, &part_index);
res = unreplicated_reader->read(real_column_names, query,
context, settings, processed_stage,
max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{
@ -2039,7 +2041,7 @@ BlockInputStreams StorageReplicatedMergeTree::read(
if (values.count(1))
{
auto res2 = reader.read(real_column_names, query, settings, processed_stage, max_block_size, threads, &part_index);
auto res2 = reader.read(real_column_names, query, context, settings, processed_stage, max_block_size, threads, &part_index);
for (auto & virtual_column : virt_column_names)
{

View File

@ -21,8 +21,13 @@ StoragePtr StorageSystemDatabases::create(const std::string & name_, const Conte
BlockInputStreams StorageSystemDatabases::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemEvents::create(const std::string & name_)
BlockInputStreams StorageSystemEvents::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -33,8 +33,13 @@ StoragePtr StorageSystemMerges::create(const std::string & name, const Context &
}
BlockInputStreams StorageSystemMerges::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, const size_t max_block_size, const unsigned)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -66,8 +66,13 @@ StoragePtr StorageSystemNumbers::create(const std::string & name_, bool multithr
BlockInputStreams StorageSystemNumbers::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemOne::create(const std::string & name_)
BlockInputStreams StorageSystemOne::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -38,8 +38,13 @@ StoragePtr StorageSystemParts::create(const std::string & name_, const Context &
BlockInputStreams StorageSystemParts::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -32,8 +32,13 @@ StoragePtr StorageSystemProcesses::create(const std::string & name_, const Conte
BlockInputStreams StorageSystemProcesses::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -46,8 +46,13 @@ StoragePtr StorageSystemReplicas::create(const std::string & name_, const Contex
BlockInputStreams StorageSystemReplicas::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
@ -59,7 +64,7 @@ BlockInputStreams StorageSystemReplicas::read(
for (const auto & db : context.getDatabases())
for (const auto & table : db.second)
if (typeid_cast<const StorageReplicatedMergeTree *>(&*table.second))
if (typeid_cast<const StorageReplicatedMergeTree *>(table.second.get()))
replicated_tables[db.first][table.first] = table.second;
}

View File

@ -26,8 +26,13 @@ StoragePtr StorageSystemSettings::create(const std::string & name_, const Contex
BlockInputStreams StorageSystemSettings::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -24,8 +24,13 @@ StoragePtr StorageSystemTables::create(const std::string & name_, const Context
BlockInputStreams StorageSystemTables::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -103,8 +103,13 @@ static String extractPath(const ASTPtr & query)
BlockInputStreams StorageSystemZooKeeper::read(
const Names & column_names, ASTPtr query, const Settings & settings,
QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads)
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -406,10 +406,11 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_da
BlockInputStreams StorageTinyLog::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

View File

@ -74,10 +74,11 @@ StorageView::StorageView(
BlockInputStreams StorageView::read(
const Names & column_names,
ASTPtr query,
const Context & context,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
const size_t max_block_size,
const unsigned threads)
{
ASTPtr inner_query_clone = getInnerQuery();
ASTSelectQuery & inner_select = static_cast<ASTSelectQuery &>(*inner_query_clone);