storages: Virtual columns for storage distributed + few style fixes [METR-9172]

This commit is contained in:
Sergey Fedorov 2014-01-22 14:24:05 +00:00
parent 14620b3347
commit f8b5613b82
7 changed files with 144 additions and 59 deletions

View File

@ -104,6 +104,16 @@ public:
*/
void disconnect();
const std::string & getHost() const
{
return host;
}
UInt16 getPort() const
{
return port;
}
private:
String host;
UInt16 port;

View File

@ -13,11 +13,11 @@
namespace DB
{
class VirtualColumnUtils
namespace VirtualColumnUtils
{
public:
/// Вычислить минимальный числовый суффикс, который надо добавить к строке, чтобы она не присутствовала в множестве
static String chooseSuffix(const NamesAndTypesList & columns, const String & name)
inline String chooseSuffix(const NamesAndTypesList & columns, const String & name)
{
int id = 0;
String current_suffix;
@ -39,7 +39,7 @@ static String chooseSuffix(const NamesAndTypesList & columns, const String & nam
/// Вычислить минимальный общий числовый суффикс, который надо добавить к каждой строке,
/// чтобы ниодна не присутствовала в множестве.
static String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names)
inline String chooseSuffixForSet(const NamesAndTypesList & columns, const std::vector<String> & names)
{
int id = 0;
String current_suffix;
@ -67,45 +67,19 @@ static String chooseSuffixForSet(const NamesAndTypesList & columns, const std::v
return current_suffix;
}
/// На данный момент не дописана и не используется.
static void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field &value)
/// Добавляет в селект запрос секцию select clumn_name as value
/// Например select _port as 9000.
inline void rewriteEntityInAst(ASTPtr ast, const String & column_name, const Field & value)
{
{
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*ast);
ASTExpressionList & node = dynamic_cast<ASTExpressionList &>(*select.select_expression_list);
ASTs & asts = node.children;
ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value);
cur->alias = column_name;
ASTPtr column_value = cur;
asts.insert(asts.begin(), column_value);
return;
}
if (ASTExpressionList * node = dynamic_cast<ASTExpressionList *>(&*ast))
{
ASTs & asts = node->children;
for (int i = static_cast<int>(asts.size()) - 1; i >= 0; --i)
{
if (ASTIdentifier * child = dynamic_cast<ASTIdentifier *>(&*asts[i]))
{
if (child->kind == ASTIdentifier::Column && child->getColumnName() == column_name)
{
ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value);
cur->alias = column_name;
ASTPtr column_value = cur;
asts.erase(asts.begin() + i);
asts.insert(asts.begin() + i, column_value);
}
}
}
}
for (auto it : ast->children)
rewriteEntityInAst(it, column_name, value);
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery & >(*ast);
ASTExpressionList & node = dynamic_cast<ASTExpressionList & >(*select.select_expression_list);
ASTs & asts = node.children;
ASTLiteral * cur = new ASTLiteral(StringRange(NULL, NULL), value);
cur->alias = column_name;
ASTPtr column_value = cur;
asts.insert(asts.begin(), column_value);
}
};
}
}

View File

@ -42,7 +42,6 @@ protected:
return res;
ColumnPtr column_ptr = ColumnConst<ColumnType> (res.rows(), value, data_type).convertToFullColumn();
ColumnWithNameAndType column(column_ptr, data_type, column_name);
res.insert(column);
return res;
}

View File

@ -3,6 +3,7 @@
#include <Yandex/logger_useful.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/VirtualColumnUtils.h>
#include <DB/Client/ConnectionPool.h>
@ -16,8 +17,8 @@ class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(connection_), query(query_), stage(stage_),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(connection_), query(query_), _host_column(""), _port_column(""), stage(stage_),
sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false),
log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
{
@ -33,9 +34,24 @@ public:
/// Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_,
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(*pool_entry), query(query_), stage(stage_),
sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false),
log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
: pool_entry(pool_entry_), connection(*pool_entry), query(query_), _host_column(""),
_port_column(""), stage(stage_), sent_query(false), finished(false), was_cancelled(false),
got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
}
RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_,
const String & _host_column_, const String & _port_column_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(*pool_entry), query(query_), _host_column(_host_column_),
_port_column(_port_column_), stage(stage_), sent_query(false), finished(false), was_cancelled(false),
got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")"))
{
if (settings_)
{
@ -90,6 +106,22 @@ public:
}
protected:
void populateBlock(Block & res)
{
if (_host_column != "")
{
ColumnPtr column_ptr = ColumnConst<String> (res.rows(), connection.getHost(), new DataTypeString).convertToFullColumn();
ColumnWithNameAndType column(column_ptr, new DataTypeString, _host_column);
res.insert(column);
}
if (_port_column != "")
{
ColumnPtr column_ptr = ColumnConst<UInt16> (res.rows(), connection.getPort(), new DataTypeUInt16).convertToFullColumn();
ColumnWithNameAndType column(column_ptr, new DataTypeUInt16, _port_column);
res.insert(column);
}
}
Block readImpl()
{
if (!sent_query)
@ -107,7 +139,10 @@ protected:
case Protocol::Server::Data:
/// Если блок не пуст и не является заголовочным блоком
if (packet.block && packet.block.rows() > 0)
{
populateBlock(packet.block);
return packet.block;
}
break; /// Если блок пустой - получим другие пакеты до EndOfStream.
case Protocol::Server::Exception:
@ -214,6 +249,10 @@ private:
const String query;
bool send_settings;
Settings settings;
/// Имя столбца, куда записать имя хоста. Пустая строка, если записывать не надо.
String _host_column;
/// Имя столбца, куда записать номер порта. Пустая строка, если записывать не надо.
String _port_column;
QueryProcessingStage::Enum stage;
/// Отправили запрос (это делается перед получением первого блока).

View File

@ -38,6 +38,8 @@ public:
bool supportsPrewhere() const { return true; }
const NamesAndTypesList & getColumnsList() const { return *columns; }
NameAndTypePair getColumn(const String &column_name) const;
bool hasColumn(const String &column_name) const;
bool isRemote() const { return true; }
@ -74,6 +76,9 @@ private:
const DataTypeFactory & data_type_factory;
String sign_column_name;
String _host_column_name;
String _port_column_name;
const Context & context;
Cluster & cluster;
};

View File

@ -5,6 +5,7 @@
#include <DB/Storages/StorageDistributed.h>
#include <Poco/Net/NetworkInterface.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <boost/bind.hpp>
@ -29,6 +30,12 @@ StorageDistributed::StorageDistributed(
context(context_),
cluster(cluster_)
{
std::vector<String> virtual_columns;
virtual_columns.push_back("_host");
virtual_columns.push_back("_port");
String suffix = VirtualColumnUtils::chooseSuffixForSet(getColumnsList(), virtual_columns);
_host_column_name = virtual_columns[0] + suffix;
_port_column_name = virtual_columns[1] + suffix;
}
StoragePtr StorageDistributed::create(
@ -46,6 +53,20 @@ StoragePtr StorageDistributed::create(
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, context_.getCluster(cluster_name), data_type_factory_, settings, context_, sign_column_name_))->thisPtr();
}
NameAndTypePair StorageDistributed::getColumn(const String &column_name) const
{
if (column_name == _host_column_name) return std::make_pair(_host_column_name, new DataTypeString);
if (column_name == _port_column_name) return std::make_pair(_port_column_name, new DataTypeUInt16);
return getRealColumn(column_name);
}
bool StorageDistributed::hasColumn(const String &column_name) const
{
if (column_name == _host_column_name) return true;
if (column_name == _port_column_name) return true;
return hasRealColumn(column_name);
}
BlockInputStreams StorageDistributed::read(
const Names & column_names,
ASTPtr query,
@ -54,30 +75,67 @@ BlockInputStreams StorageDistributed::read(
size_t max_block_size,
unsigned threads)
{
Names virt_column_names(2, ""), real_column_names;
for (const auto & it : column_names)
if (it == _host_column_name)
virt_column_names[0] = _host_column_name;
else if (it == _port_column_name)
virt_column_names[1] = _port_column_name;
else
real_column_names.push_back(it);
processed_stage = (cluster.pools.size() + cluster.getLocalNodesNum()) == 1
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
/// Заменим в запросе имена БД и таблицы.
ASTPtr modified_query_ast = query->clone();
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*modified_query_ast);
select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database);
select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table);
/// Установим sign_rewrite = 0, чтобы второй раз не переписывать запрос
Settings new_settings = settings;
new_settings.sign_rewrite = false;
new_settings.queue_max_wait_ms = Cluster::saturation(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
std::stringstream s;
formatAST(select, s, 0, false, true);
String modified_query = s.str();
BlockInputStreams res;
for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it)
res.push_back(new RemoteBlockInputStream((*it)->get(&new_settings), modified_query, &new_settings, processed_stage));
{
/// Заменим в запросе имена БД и таблицы.
ASTPtr modified_query_ast = query->clone();
/// Добавляем в запрос значения хоста и порта
String trash_host = (*it)->get()->getHost();
size_t trash_port = (*it)->get()->getPort();
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _host_column_name, trash_host);
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _port_column_name, trash_port);
/// Меняем имена таблицы и базы данных
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*modified_query_ast);
select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database);
select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table);
std::stringstream s;
formatAST(select, s, 0, false, true);
String modified_query = s.str();
res.push_back(new RemoteBlockInputStream((*it)->get(&new_settings), modified_query, &new_settings, virt_column_names[0], virt_column_names[1], processed_stage));
}
/// Заменим в запросе имена БД и таблицы.
ASTPtr modified_query_ast = query->clone();
/// Добавляем в запрос значения хоста и порта
String trash_host = "localhost";
size_t trash_port = 9000;
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _host_column_name, trash_host);
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, _port_column_name, trash_port);
/// Меняем имена таблицы и базы данных
ASTSelectQuery & select = dynamic_cast<ASTSelectQuery &>(*modified_query_ast);
select.database = new ASTIdentifier(StringRange(), remote_database, ASTIdentifier::Database);
select.table = new ASTIdentifier(StringRange(), remote_table, ASTIdentifier::Table);
std::stringstream s;
formatAST(select, s, 0, false, true);
String modified_query = s.str();
/// добавляем запросы к локальному clickhouse
DB::Context new_context = context;

View File

@ -49,7 +49,7 @@ BlockInputStreams StorageMerge::read(
BlockInputStreams res;
Names virt_column_names, real_column_names;
for (auto & it : column_names)
for (const auto & it : column_names)
if (it != _table_column_name)
real_column_names.push_back(it);
else