mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
Merge
This commit is contained in:
parent
1995d3e1ba
commit
b1c6001e40
@ -42,6 +42,8 @@ public:
|
||||
bool supportsPrewhere() const { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsList() const { return *columns; }
|
||||
NameAndTypePair getColumn(const String &column_name) const;
|
||||
bool hasColumn(const String &column_name) const;
|
||||
|
||||
bool isRemote() const { return true; }
|
||||
/// Сохранить временные таблицы, чтобы при следующем вызове метода read переслать их на удаленные серверы.
|
||||
|
@ -59,9 +59,6 @@ private:
|
||||
OptimizedRegularExpression table_name_regexp;
|
||||
const Context & context;
|
||||
|
||||
/// Название виртуального столбца, отвечающего за имя таблицы, из которой идет чтение. (Например "_table")
|
||||
String _table_column_name;
|
||||
|
||||
StorageMerge(
|
||||
const std::string & name_,
|
||||
NamesAndTypesListPtr columns_,
|
||||
|
20
dbms/include/DB/Storages/VirtualColumnFactory.h
Normal file
20
dbms/include/DB/Storages/VirtualColumnFactory.h
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/DataTypes/IDataType.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Знает имена и типы всех возможных виртуальных столбцов.
|
||||
* Нужно для движков, перенаправляющих запрос в другие таблицы, не зная заранее, какие в них есть виртуальные столбцы.
|
||||
*/
|
||||
class VirtualColumnFactory
|
||||
{
|
||||
public:
|
||||
static bool hasColumn(const String & name);
|
||||
static DataTypePtr getType(const String & name);
|
||||
|
||||
static DataTypePtr tryGetType(const String & name);
|
||||
};
|
||||
|
||||
}
|
@ -4,6 +4,7 @@
|
||||
#include <DB/DataStreams/RemoveColumnsBlockInputStream.h>
|
||||
|
||||
#include <DB/Storages/StorageDistributed.h>
|
||||
#include <DB/Storages/VirtualColumnFactory.h>
|
||||
|
||||
#include <Poco/Net/NetworkInterface.h>
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
@ -140,4 +141,18 @@ void StorageDistributed::alter(const AlterCommands & params, const String & data
|
||||
InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context);
|
||||
}
|
||||
|
||||
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
|
||||
{
|
||||
auto type = VirtualColumnFactory::tryGetType(column_name);
|
||||
if (type)
|
||||
return NameAndTypePair(column_name, type);
|
||||
|
||||
return getRealColumn(column_name);
|
||||
}
|
||||
|
||||
bool StorageDistributed::hasColumn(const String & column_name) const
|
||||
{
|
||||
return VirtualColumnFactory::hasColumn(column_name) || hasRealColumn(column_name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <DB/Storages/StorageMerge.h>
|
||||
#include <DB/Common/VirtualColumnUtils.h>
|
||||
#include <DB/Interpreters/InterpreterAlterQuery.h>
|
||||
#include <DB/Storages/VirtualColumnFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -14,7 +15,6 @@ StorageMerge::StorageMerge(
|
||||
const Context & context_)
|
||||
: name(name_), columns(columns_), source_database(source_database_), table_name_regexp(table_name_regexp_), context(context_)
|
||||
{
|
||||
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
|
||||
}
|
||||
|
||||
StoragePtr StorageMerge::create(
|
||||
@ -27,16 +27,18 @@ StoragePtr StorageMerge::create(
|
||||
return (new StorageMerge(name_, columns_, source_database_, table_name_regexp_, context_))->thisPtr();
|
||||
}
|
||||
|
||||
NameAndTypePair StorageMerge::getColumn(const String &column_name) const
|
||||
NameAndTypePair StorageMerge::getColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == _table_column_name) return NameAndTypePair(_table_column_name, new DataTypeString);
|
||||
auto type = VirtualColumnFactory::tryGetType(column_name);
|
||||
if (type)
|
||||
return NameAndTypePair(column_name, type);
|
||||
|
||||
return getRealColumn(column_name);
|
||||
}
|
||||
|
||||
bool StorageMerge::hasColumn(const String &column_name) const
|
||||
bool StorageMerge::hasColumn(const String & column_name) const
|
||||
{
|
||||
if (column_name == _table_column_name) return true;
|
||||
return hasRealColumn(column_name);
|
||||
return VirtualColumnFactory::hasColumn(column_name) || hasRealColumn(column_name);
|
||||
}
|
||||
|
||||
BlockInputStreams StorageMerge::read(
|
||||
@ -51,7 +53,7 @@ BlockInputStreams StorageMerge::read(
|
||||
|
||||
Names virt_column_names, real_column_names;
|
||||
for (const auto & it : column_names)
|
||||
if (it != _table_column_name)
|
||||
if (it != "_table")
|
||||
real_column_names.push_back(it);
|
||||
else
|
||||
virt_column_names.push_back(it);
|
||||
@ -85,7 +87,7 @@ BlockInputStreams StorageMerge::read(
|
||||
if (!virt_column_names.empty())
|
||||
VirtualColumnUtils::filterBlockWithQuery(query->clone(), virtual_columns_block, context);
|
||||
|
||||
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, _table_column_name);
|
||||
std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_table");
|
||||
|
||||
for (size_t i = 0, size = selected_tables.size(); i < size; ++i)
|
||||
{
|
||||
@ -101,7 +103,7 @@ BlockInputStreams StorageMerge::read(
|
||||
|
||||
/// Подменяем виртуальный столбец на его значение
|
||||
ASTPtr modified_query_ast = query->clone();
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _table_column_name, table->getTableName());
|
||||
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_table", table->getTableName());
|
||||
|
||||
BlockInputStreams source_streams = table->read(
|
||||
real_column_names,
|
||||
@ -116,10 +118,10 @@ BlockInputStreams StorageMerge::read(
|
||||
|
||||
for (auto & virtual_column : virt_column_names)
|
||||
{
|
||||
if (virtual_column == _table_column_name)
|
||||
if (virtual_column == "_table")
|
||||
{
|
||||
for (auto & stream : source_streams)
|
||||
stream = new AddingConstColumnBlockInputStream<String>(stream, new DataTypeString, table->getTableName(), _table_column_name);
|
||||
stream = new AddingConstColumnBlockInputStream<String>(stream, new DataTypeString, table->getTableName(), "_table");
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,7 +143,7 @@ BlockInputStreams StorageMerge::read(
|
||||
Block StorageMerge::getBlockWithVirtualColumns(const std::vector<StoragePtr> & selected_tables) const
|
||||
{
|
||||
Block res;
|
||||
ColumnWithNameAndType _table(new ColumnString, new DataTypeString, _table_column_name);
|
||||
ColumnWithNameAndType _table(new ColumnString, new DataTypeString, "_table");
|
||||
|
||||
for (StorageVector::const_iterator it = selected_tables.begin(); it != selected_tables.end(); ++it)
|
||||
_table.column->insert((*it)->getTableName());
|
||||
|
31
dbms/src/Storages/VirtualColumnFactory.cpp
Normal file
31
dbms/src/Storages/VirtualColumnFactory.cpp
Normal file
@ -0,0 +1,31 @@
|
||||
#include <DB/Storages/VirtualColumnFactory.h>
|
||||
#include <DB/DataTypes/DataTypeString.h>
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
DataTypePtr VirtualColumnFactory::getType(const String & name)
|
||||
{
|
||||
auto res = tryGetType(name);
|
||||
if (!res)
|
||||
throw Exception("There is no column " + name + " in table.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
|
||||
return res;
|
||||
}
|
||||
|
||||
bool VirtualColumnFactory::hasColumn(const String & name)
|
||||
{
|
||||
return !!tryGetType(name);
|
||||
}
|
||||
|
||||
DataTypePtr VirtualColumnFactory::tryGetType(const String & name)
|
||||
{
|
||||
if (name == "_table") return new DataTypeString;
|
||||
if (name == "_part") return new DataTypeString;
|
||||
if (name == "_part_index") return new DataTypeUInt64;
|
||||
if (name == "_replicated") return new DataTypeUInt8;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user