refactor write to StorageDistributed. [#METR-12221]

This commit is contained in:
Andrey Mironov 2014-08-13 13:20:15 +04:00
parent e73bfe7b8a
commit 2db71666a3
4 changed files with 46 additions and 36 deletions

View File

@ -9,20 +9,14 @@ namespace DB
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(
StorageDistributed & storage, Cluster & cluster,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & key_column_name
)
: storage(storage), cluster(cluster)
, sharding_key_expr(sharding_key_expr)
, key_column_name(key_column_name)
DistributedBlockOutputStream(StorageDistributed & storage, Cluster & cluster, const ASTPtr & query)
: storage(storage), cluster(cluster), query(query)
{
}
virtual void write(const Block & block) override
{
if (sharding_key_expr && cluster.shard_info_vec.size() > 1)
if (storage.getShardingKeyExpr() && cluster.shard_info_vec.size() > 1)
splitWrite(block, block);
else
writeImpl(block);
@ -31,9 +25,9 @@ public:
private:
void splitWrite(const Block & block, Block block_with_key)
{
sharding_key_expr->execute(block_with_key);
storage.getShardingKeyExpr()->execute(block_with_key);
const auto & key_column = block_with_key.getByName(key_column_name).column;
const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column;
const auto total_weight = cluster.slot_to_shard.size();
/// seems like cloned blocks have the same underlying container
Blocks target_blocks(cluster.shard_info_vec.size(), block.cloneEmpty());
@ -62,8 +56,7 @@ private:
StorageDistributed & storage;
Cluster & cluster;
ExpressionActionsPtr sharding_key_expr;
std::string key_column_name;
ASTPtr query;
};
}

View File

@ -26,7 +26,8 @@ public:
const String & remote_table_, /// Имя таблицы на удалённых серверах.
const String & cluster_name,
Context & context_,
const ASTPtr & sharding_key_ = nullptr);
const ASTPtr & sharding_key_,
const String & data_path_);
static StoragePtr create(
const std::string & name_, /// Имя таблицы.
@ -34,8 +35,7 @@ public:
const String & remote_database_, /// БД на удалённых серверах.
const String & remote_table_, /// Имя таблицы на удалённых серверах.
SharedPtr<Cluster> & owned_cluster_,
Context & context_,
const ASTPtr & sharding_key_ = nullptr);
Context & context_);
std::string getName() const { return "Distributed"; }
std::string getTableName() const { return name; }
@ -67,7 +67,9 @@ public:
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);
const ASTPtr & getShardingKey() const { return sharding_key; }
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
const String & getPath() const { return path; }
private:
StorageDistributed(
@ -77,7 +79,8 @@ private:
const String & remote_table_,
Cluster & cluster_,
const Context & context_,
const ASTPtr & sharding_key_ = nullptr);
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
/// Создает копию запроса, меняет имена базы данных и таблицы.
ASTPtr rewriteQuery(ASTPtr query);
@ -99,8 +102,10 @@ private:
/// Соединения с удалёнными серверами.
Cluster & cluster;
ASTPtr sharding_key;
ExpressionActionsPtr sharding_key_expr;
String sharding_key_column_name;
bool write_enabled;
String path;
};
}

View File

@ -10,6 +10,8 @@
#include <Poco/Net/NetworkInterface.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <boost/bind.hpp>
@ -25,16 +27,22 @@ StorageDistributed::StorageDistributed(
const String & remote_table_,
Cluster & cluster_,
const Context & context_,
const ASTPtr & sharding_key_)
const ASTPtr & sharding_key_,
const String & data_path_)
: name(name_), columns(columns_),
remote_database(remote_database_), remote_table(remote_table_),
context(context_),
cluster(cluster_),
sharding_key(sharding_key_),
write_enabled(cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)
context(context_), cluster(cluster_),
sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, *columns).getActions(false) : nullptr),
sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}),
write_enabled(cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_),
path(data_path_ + escapeForFileName(name) + '/')
{
std::cout << "table `" << name << "` in " << path << std::endl;
for (auto & shard_info : cluster.shard_info_vec) {
std::cout << "shard '" << shard_info.dir_name << "' with weight " << shard_info.weight << std::endl;
std::cout
<< "\twill write to " << path + shard_info.dir_name
<< " with weight " << shard_info.weight
<< std::endl;
}
}
@ -45,10 +53,16 @@ StoragePtr StorageDistributed::create(
const String & remote_table_,
const String & cluster_name,
Context & context_,
const ASTPtr & sharding_key_)
const ASTPtr & sharding_key_,
const String & data_path_)
{
context_.initClusters();
return (new StorageDistributed(name_, columns_, remote_database_, remote_table_, context_.getCluster(cluster_name), context_, sharding_key_))->thisPtr();
return (new StorageDistributed{
name_, columns_, remote_database_, remote_table_,
context_.getCluster(cluster_name), context_,
sharding_key_, data_path_
})->thisPtr();
}
@ -58,10 +72,12 @@ StoragePtr StorageDistributed::create(
const String & remote_database_,
const String & remote_table_,
SharedPtr<Cluster> & owned_cluster_,
Context & context_,
const ASTPtr & sharding_key_)
Context & context_)
{
auto res = new StorageDistributed(name_, columns_, remote_database_, remote_table_, *owned_cluster_, context_, sharding_key_);
auto res = new StorageDistributed{
name_, columns_, remote_database_,
remote_table_, *owned_cluster_, context_
};
/// Захватываем владение объектом-кластером.
res->owned_cluster = owned_cluster_;
@ -151,11 +167,7 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query)
ErrorCodes::NOT_IMPLEMENTED
};
return new DistributedBlockOutputStream{
*this, this->cluster,
sharding_key ? ExpressionAnalyzer(sharding_key, context, *columns).getActions(false) : nullptr,
sharding_key ? sharding_key->getColumnName() : std::string{}
};
return new DistributedBlockOutputStream{*this, this->cluster, query};
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)

View File

@ -188,7 +188,7 @@ StoragePtr StorageFactory::get(
const auto & sharding_key = args.size() == 4 ? args[3] : nullptr;
return StorageDistributed::create(
table_name, columns, remote_database, remote_table, cluster_name, context, sharding_key
table_name, columns, remote_database, remote_table, cluster_name, context, sharding_key, data_path
);
}
else if (endsWith(name, "MergeTree"))