mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Merge
This commit is contained in:
parent
395d741212
commit
91609727a8
@ -1,180 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
#include <DB/Storages/StoragePtr.h>
|
||||
#include <DB/Columns/ColumnConst.h>
|
||||
|
||||
#include <vector>
|
||||
#include <map>
|
||||
#include <list>
|
||||
#include <algorithm>
|
||||
#include <boost/iterator/iterator_concepts.hpp>
|
||||
|
||||
#include <DB/Core/ColumnWithNameAndType.h>
|
||||
#include <DB/Core/NamesAndTypes.h>
|
||||
#include <DB/Core/Exception.h>
|
||||
#include <DB/Core/ErrorCodes.h>
|
||||
#include <DB/Core/Names.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Poco::SharedPtr;
|
||||
|
||||
|
||||
/** Виртуальные столбцы - столбцы, предоставляемые Storage-ом, независимо от определения таблицы.
|
||||
* То есть, такие столбцы не указываются в CREATE TABLE, но доступны для SELECT-а.
|
||||
*/
|
||||
|
||||
|
||||
/// Интерфейс для одного виртуального столбца
|
||||
class IVirtualColumn
|
||||
{
|
||||
public:
|
||||
virtual String getName() const = 0;
|
||||
virtual void setName(const String &s) = 0;
|
||||
virtual NameAndTypePair getNameAndType() const = 0;
|
||||
virtual ColumnPtr getColumnPtr(StoragePtr storage, size_t s) const = 0;
|
||||
virtual ColumnWithNameAndType getColumn(StoragePtr storage, size_t s) const = 0;
|
||||
};
|
||||
|
||||
/// Шаблон для одного виртуального столбца типа T
|
||||
template <typename T>
|
||||
class VirtualColumn : public IVirtualColumn
|
||||
{
|
||||
public:
|
||||
typedef T Type;
|
||||
typedef Type (*extractor_func)(StoragePtr);
|
||||
|
||||
private:
|
||||
String name; /// Имя столбца
|
||||
extractor_func extractor; /// Функция, которая получает значение столбца по указателю на таблицу
|
||||
DataTypePtr data_type; /// Тип данных
|
||||
|
||||
public:
|
||||
VirtualColumn(const String & desired_name, extractor_func extractor, DataTypePtr data_type) : name(desired_name), extractor(extractor), data_type(data_type) { }
|
||||
|
||||
String getName() const { return name; }
|
||||
void setName(const String &s) { name = s; }
|
||||
|
||||
NameAndTypePair getNameAndType() const { return make_pair(name, data_type); }
|
||||
|
||||
/// Создает константный столбец размера s, вычисляя значение extractor на storage
|
||||
ColumnPtr getColumnPtr(StoragePtr storage, size_t s) const
|
||||
{
|
||||
return new ColumnConst<Type> (s, extractor(storage), data_type);
|
||||
}
|
||||
|
||||
/// Возвращает столбец, готовый для вставки в блок
|
||||
ColumnWithNameAndType getColumn(StoragePtr storage, size_t s) const
|
||||
{
|
||||
return ColumnWithNameAndType (getColumnPtr(storage, s), data_type, name);
|
||||
}
|
||||
};
|
||||
|
||||
/// Структура со всеми виртуальными столбцами для таблицы
|
||||
class VirtualColumnList
|
||||
{
|
||||
private:
|
||||
std::vector< SharedPtr<IVirtualColumn> > columns; /// Список всех виртуальных столбцов
|
||||
|
||||
public:
|
||||
/// Добавить столбец
|
||||
void addColumn(SharedPtr<IVirtualColumn> a)
|
||||
{
|
||||
columns.push_back(a);
|
||||
}
|
||||
|
||||
NamesAndTypesList getColumnsList() const
|
||||
{
|
||||
NamesAndTypesList res;
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
{
|
||||
res.push_back(columns[i]->getNameAndType());
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/// Вычислить суффиксы для имен столбцов, зная уже присутствующие в таблице столбцы
|
||||
void calculateNames(const NamesAndTypesList &already_in_storage)
|
||||
{
|
||||
int id = 0;
|
||||
while (true)
|
||||
{
|
||||
String suf;
|
||||
{
|
||||
int now = id;
|
||||
while (now > 0)
|
||||
{
|
||||
char cur = '0' + now % 10;
|
||||
suf = cur + suf;
|
||||
now /= 10;
|
||||
}
|
||||
}
|
||||
bool done = true;
|
||||
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
{
|
||||
String temp_name = columns[i]->getName() + suf;
|
||||
for (NamesAndTypesList::const_iterator j = already_in_storage.begin(); j != already_in_storage.end(); ++j)
|
||||
if (j->first == temp_name)
|
||||
{
|
||||
done = false;
|
||||
break;
|
||||
}
|
||||
if (!done) break;
|
||||
}
|
||||
|
||||
if (done)
|
||||
{
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
columns[i]->setName(columns[i]->getName() + suf);
|
||||
break;
|
||||
}
|
||||
id ++;
|
||||
}
|
||||
}
|
||||
|
||||
/// Разбить множество имен столбцов на два: виртуальные и невиртуальные
|
||||
void splitNames(const Names & column_names, Names ¬virt, VirtualColumnList &virt) const
|
||||
{
|
||||
notvirt.clear();
|
||||
for (Names::const_iterator i = column_names.begin(); i != column_names.end(); ++i)
|
||||
{
|
||||
bool is_virt = false;
|
||||
size_t id = -1;
|
||||
for (size_t j = 0; j < columns.size(); ++j)
|
||||
if (columns[j]->getName() == *i)
|
||||
{
|
||||
is_virt = true;
|
||||
id = j;
|
||||
break;
|
||||
}
|
||||
if (is_virt)
|
||||
virt.addColumn(columns[id]);
|
||||
else
|
||||
notvirt.push_back(*i);
|
||||
}
|
||||
}
|
||||
|
||||
/// Пополнить блок всеми столбцами.
|
||||
void populate(Block &res, StoragePtr storage)
|
||||
{
|
||||
int rows = res.rows();
|
||||
for (size_t i = 0; i < columns.size(); ++i)
|
||||
{
|
||||
res.insert(columns[i]->getColumn(storage, rows));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/// Static класс со всеми функциями экстракторами.
|
||||
class Extractors {
|
||||
public:
|
||||
static String nameExtractor(StoragePtr a)
|
||||
{
|
||||
return a->getTableName();
|
||||
}
|
||||
};
|
||||
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
|
||||
#include <DB/DataStreams/IProfilingBlockInputStream.h>
|
||||
#include <DB/Columns/ColumnConst.h>
|
||||
#include <DB/Core/VirtualColumnsList.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Добавляет в блок виртуальные столбцы.
|
||||
* Получает на вход VirtualColumnList и StoragePtr, относительно которого будут вычисляться значения столбцов.
|
||||
*/
|
||||
class AddingVirtualColumnsBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
AddingVirtualColumnsBlockInputStream(
|
||||
BlockInputStreamPtr input_,
|
||||
VirtualColumnList virtual_columns_,
|
||||
StoragePtr storage_)
|
||||
: virtual_columns(virtual_columns_), storage(storage_)
|
||||
{
|
||||
children.push_back(input_);
|
||||
}
|
||||
|
||||
String getName() const { return "AddingVirtualColumnsBlockInputStream"; }
|
||||
|
||||
String getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << "AddingVirtualColumnsBlockInputStream(" << children.back()->getID() << ")";
|
||||
return res.str();
|
||||
}
|
||||
|
||||
protected:
|
||||
Block readImpl()
|
||||
{
|
||||
Block res = children.back()->read();
|
||||
if (!res)
|
||||
return res;
|
||||
virtual_columns.populate(res, storage);
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
VirtualColumnList virtual_columns;
|
||||
StoragePtr storage;
|
||||
};
|
||||
|
||||
}
|
@ -20,20 +20,16 @@ class ExpressionAnalyzer : private boost::noncopyable
|
||||
public:
|
||||
ExpressionAnalyzer(const ASTPtr & ast_, const Context & context_, size_t subquery_depth_ = 0)
|
||||
: ast(ast_), context(context_), settings(context.getSettings()),
|
||||
subquery_depth(subquery_depth_), columns(context.getColumns()), real_columns(context.getColumns()), storage(getTable())
|
||||
subquery_depth(subquery_depth_), columns(context.getColumns()), storage(getTable())
|
||||
{
|
||||
if (storage)
|
||||
columns = storage->getFullColumnsList();
|
||||
init();
|
||||
}
|
||||
|
||||
/// columns - список известных столбцов (которых можно достать из таблицы).
|
||||
ExpressionAnalyzer(const ASTPtr & ast_, const Context & context_, const NamesAndTypesList & columns_, size_t subquery_depth_ = 0)
|
||||
: ast(ast_), context(context_), settings(context.getSettings()),
|
||||
subquery_depth(subquery_depth_), columns(columns_), real_columns(context.getColumns()), storage(getTable())
|
||||
subquery_depth(subquery_depth_), columns(columns_), storage(getTable())
|
||||
{
|
||||
if (storage)
|
||||
columns = storage->getFullColumnsList();
|
||||
init();
|
||||
}
|
||||
|
||||
@ -96,10 +92,8 @@ private:
|
||||
/// Столбцы, которые упоминаются в выражении, но не были заданы в конструкторе.
|
||||
NameSet unknown_required_columns;
|
||||
|
||||
/// Исходные столбцы, все
|
||||
/// Исходные столбцы.
|
||||
NamesAndTypesList columns;
|
||||
/// Исходные столбцы, только не виртуальные
|
||||
NamesAndTypesList real_columns;
|
||||
/// Столбцы после ARRAY JOIN. Если нет ARRAY JOIN, совпадает с columns.
|
||||
NamesAndTypesList columns_after_array_join;
|
||||
/// Столбцы после агрегации. Если нет агрегации, совпадает с columns_after_array_join.
|
||||
|
@ -43,11 +43,6 @@ public:
|
||||
*/
|
||||
virtual const NamesAndTypesList & getColumnsList() const = 0;
|
||||
|
||||
/** Получить список имён и типов столбцов таблицы, включая виртуальные столбцы.
|
||||
* По умолчанию, возвращает getColumnsList().
|
||||
*/
|
||||
virtual NamesAndTypesList getFullColumnsList() const;
|
||||
|
||||
const DataTypePtr getDataTypeByName(const String & column_name) const;
|
||||
|
||||
/** То же самое, но в виде блока-образца.
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/Core/VirtualColumnsList.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -30,7 +30,6 @@ public:
|
||||
bool supportsSampling() const { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const { return *columns; }
|
||||
NamesAndTypesList getFullColumnsList() const;
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
@ -54,7 +53,6 @@ private:
|
||||
String source_database;
|
||||
OptimizedRegularExpression table_name_regexp;
|
||||
const Context & context;
|
||||
Poco::SharedPtr<VirtualColumnList> virtual_columns;
|
||||
|
||||
StorageMerge(
|
||||
const std::string & name_,
|
||||
|
@ -439,7 +439,7 @@ void ExpressionAnalyzer::normalizeTreeImpl(ASTPtr & ast, MapOfASTs & finished_as
|
||||
if (ASTAsterisk * asterisk = dynamic_cast<ASTAsterisk *>(&*asts[i]))
|
||||
{
|
||||
ASTs all_columns;
|
||||
for (NamesAndTypesList::const_iterator it = real_columns.begin(); it != real_columns.end(); ++it)
|
||||
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
|
||||
all_columns.push_back(new ASTIdentifier(asterisk->range, it->first));
|
||||
asts.erase(asts.begin() + i);
|
||||
asts.insert(asts.begin() + i, all_columns.begin(), all_columns.end());
|
||||
|
@ -15,11 +15,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
NamesAndTypesList IStorage::getFullColumnsList() const
|
||||
{
|
||||
return getColumnsList();
|
||||
}
|
||||
|
||||
|
||||
const DataTypePtr IStorage::getDataTypeByName(const String & column_name) const
|
||||
{
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <DB/DataStreams/narrowBlockInputStreams.h>
|
||||
#include <DB/Storages/StorageMerge.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
|
||||
#include <DB/DataStreams/AddingVirtualColumnsBlockInputStream.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -15,10 +13,6 @@ StorageMerge::StorageMerge(
|
||||
const Context & context_)
|
||||
: name(name_), columns(columns_), source_database(source_database_), table_name_regexp(table_name_regexp_), context(context_)
|
||||
{
|
||||
/// Создаем виртуальные столбцы и инициализруем их
|
||||
virtual_columns = new VirtualColumnList;
|
||||
virtual_columns->addColumn(new VirtualColumn<String>("_table", *Extractors::nameExtractor, new DataTypeString));
|
||||
virtual_columns->calculateNames(*columns);
|
||||
}
|
||||
|
||||
StoragePtr StorageMerge::create(
|
||||
@ -31,13 +25,6 @@ StoragePtr StorageMerge::create(
|
||||
return (new StorageMerge(name_, columns_, source_database_, table_name_regexp_, context_))->thisPtr();
|
||||
}
|
||||
|
||||
NamesAndTypesList StorageMerge::getFullColumnsList() const
|
||||
{
|
||||
NamesAndTypesList res = getColumnsList();
|
||||
NamesAndTypesList virt = virtual_columns->getColumnsList();
|
||||
res.splice(res.end(), virt);
|
||||
return res;
|
||||
}
|
||||
|
||||
BlockInputStreams StorageMerge::read(
|
||||
const Names & column_names,
|
||||
@ -49,10 +36,6 @@ BlockInputStreams StorageMerge::read(
|
||||
{
|
||||
BlockInputStreams res;
|
||||
|
||||
Names notvirt;
|
||||
VirtualColumnList virt;
|
||||
virtual_columns->splitNames(column_names, notvirt, virt);
|
||||
|
||||
typedef std::vector<StoragePtr> SelectedTables;
|
||||
SelectedTables selected_tables;
|
||||
|
||||
@ -74,7 +57,7 @@ BlockInputStreams StorageMerge::read(
|
||||
for (SelectedTables::iterator it = selected_tables.begin(); it != selected_tables.end(); ++it)
|
||||
{
|
||||
BlockInputStreams source_streams = (*it)->read(
|
||||
notvirt,
|
||||
column_names,
|
||||
query,
|
||||
settings,
|
||||
tmp_processed_stage,
|
||||
@ -82,9 +65,7 @@ BlockInputStreams StorageMerge::read(
|
||||
selected_tables.size() > threads ? 1 : (threads / selected_tables.size()));
|
||||
|
||||
for (BlockInputStreams::iterator jt = source_streams.begin(); jt != source_streams.end(); ++jt)
|
||||
{
|
||||
res.push_back(new AddingVirtualColumnsBlockInputStream(*jt, virt, *it));
|
||||
}
|
||||
res.push_back(*jt);
|
||||
|
||||
if (tmp_processed_stage < processed_stage)
|
||||
processed_stage = tmp_processed_stage;
|
||||
|
Loading…
Reference in New Issue
Block a user