ClickHouse/dbms/src/Storages/StorageMerge.cpp
2013-11-13 09:47:12 +00:00

97 lines
3.0 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/Storages/StorageMerge.h>
namespace DB
{
StorageMerge::StorageMerge(
const std::string & name_,
NamesAndTypesListPtr columns_,
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
: name(name_), columns(columns_), source_database(source_database_), table_name_regexp(table_name_regexp_), context(context_)
{
}
StoragePtr StorageMerge::create(
const std::string & name_,
NamesAndTypesListPtr columns_,
const String & source_database_,
const String & table_name_regexp_,
const Context & context_)
{
return (new StorageMerge(name_, columns_, source_database_, table_name_regexp_, context_))->thisPtr();
}
BlockInputStreams StorageMerge::read(
const Names & column_names,
ASTPtr query,
const Settings & settings,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
BlockInputStreams res;
typedef std::vector<StoragePtr> SelectedTables;
SelectedTables selected_tables;
/// Среди всех стадий, до которых обрабатывается запрос в таблицах-источниках, выберем минимальную.
processed_stage = QueryProcessingStage::Complete;
QueryProcessingStage::Enum tmp_processed_stage = QueryProcessingStage::Complete;
/// Список таблиц могут менять в другом потоке.
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
context.assertDatabaseExists(source_database);
/** Сначала составим список выбранных таблиц, чтобы узнать его размер.
* Это нужно, чтобы правильно передать в каждую таблицу рекомендацию по количеству потоков.
*/
getSelectedTables(selected_tables);
}
for (SelectedTables::iterator it = selected_tables.begin(); it != selected_tables.end(); ++it)
{
BlockInputStreams source_streams = (*it)->read(
column_names,
query,
settings,
tmp_processed_stage,
max_block_size,
selected_tables.size() > threads ? 1 : (threads / selected_tables.size()));
for (BlockInputStreams::iterator jt = source_streams.begin(); jt != source_streams.end(); ++jt)
res.push_back(*jt);
if (tmp_processed_stage < processed_stage)
processed_stage = tmp_processed_stage;
}
/** Если истчоников слишком много, то склеим их в threads источников.
*/
if (res.size() > threads)
res = narrowBlockInputStreams(res, threads);
return res;
}
void StorageMerge::getSelectedTables(StorageVector & selected_tables)
{
const Tables & tables = context.getDatabases().at(source_database);
for (Tables::const_iterator it = tables.begin(); it != tables.end(); ++it)
if (it->second != this && table_name_regexp.match(it->first))
selected_tables.push_back(it->second);
}
void StorageMerge::alter(const ASTAlterQuery::Parameters & params)
{
alterColumns(params, columns, context);
}
}