review corrections. [#METR-12221]

This commit is contained in:
Andrey Mironov 2014-08-21 16:07:29 +04:00
parent c58098c854
commit c6d4fd941f
6 changed files with 86 additions and 66 deletions

View File

@ -31,6 +31,7 @@ public:
struct ShardInfo
{
/// contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
int weight;
bool has_local_node;

View File

@ -25,26 +25,33 @@ namespace
const auto user_pw_end = strchr(address.data(), '@');
const auto colon = strchr(address.data(), ':');
if (!user_pw_end || !colon)
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"};
throw Exception{
"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern",
ErrorCodes::INCORRECT_FILE_NAME
};
const auto has_pw = colon < user_pw_end;
const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{"Shard address '" + address + "' does not contain port"};
throw Exception{
"Shard address '" + address + "' does not contain port",
ErrorCodes::INCORRECT_FILE_NAME
};
const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end});
const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{};
const auto host = unescapeForFileName({user_pw_end + 1, host_end});
const auto port = DB::parse<UInt16>(host_end + 1);
const auto port = parse<UInt16>(host_end + 1);
pools.emplace_back(factory(host, port, user, password));
}
/// just to be explicit
return std::move(pools);
return pools;
}
}
/** Implementation for StorageDistributed::DirectoryMonitor nested class.
* This type is not designed for standalone use. */
class StorageDistributed::DirectoryMonitor
{
public:
@ -93,8 +100,8 @@ private:
ConnectionPoolPtr createPool(const std::string & name)
{
const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port, const std::string & user, const std::string & password)
{
const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port,
const std::string & user, const std::string & password) {
return new ConnectionPool{
1, host, port, "",
user, password, storage.context.getDataTypeFactory(),
@ -103,7 +110,7 @@ private:
auto pools = createPoolsForAddresses(name, pool_factory);
return pools.size() == 1 ? pools.front() : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM);
return pools.size() == 1 ? pools.front() : new ConnectionPoolWithFailover(pools, LoadBalancing::RANDOM);
}
bool findFiles()
@ -117,7 +124,7 @@ private:
Poco::Path file_path{file_path_str};
if (!it->isDirectory() && 0 == strncmp(file_path.getExtension().data(), "bin", strlen("bin")))
files[DB::parse<UInt64>(file_path.getBaseName())] = file_path_str;
files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
}
if (files.empty())
@ -141,12 +148,12 @@ private:
try
{
DB::ReadBufferFromFile in{file_path};
ReadBufferFromFile in{file_path};
std::string insert_query;
DB::readStringBinary(insert_query, in);
readStringBinary(insert_query, in);
DB::RemoteBlockOutputStream remote{*connection, insert_query};
RemoteBlockOutputStream remote{*connection, insert_query};
remote.writePrefix();
remote.writePrepared(in);
@ -165,11 +172,12 @@ private:
const auto & path = file_path.substr(0, last_path_separator_pos + 1);
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
const auto & broken_path = path + "broken/";
const auto & broken_file_path = broken_path + file_name;
Poco::File{broken_path}.createDirectory();
Poco::File{file_path}.moveTo(broken_path + file_name);
Poco::File{file_path}.renameTo(broken_file_path);
LOG_ERROR(log, "Moved `" << file_path << "` to broken/ directory");
LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
}
throw;
@ -180,7 +188,8 @@ private:
LOG_TRACE(log, "Finished processing `" << file_path << '`');
}
std::string getLoggerName() const {
std::string getLoggerName() const
{
return storage.name + '.' + storage.getName() + ".DirectoryMonitor";
}

View File

@ -9,12 +9,21 @@
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <statdaemons/Increment.h>
#include <statdaemons/stdext.h>
#include <iostream>
namespace DB
{
/** Запись асинхронная - данные сначала записываются на локальную файловую систему, а потом отправляются на удалённые серверы.
* Если Distributed таблица использует более одного шарда, то для того, чтобы поддерживалась запись,
* при создании таблицы должен быть указан дополнительный параметр у ENGINE - ключ шардирования.
* Ключ шардирования - произвольное выражение от столбцов. Например, rand() или UserID.
* При записи блок данных разбивается по остатку от деления ключа шардирования на суммарный вес шардов,
* и полученные блоки пишутся в сжатом Native формате в отдельные директории для отправки.
* Для каждого адреса назначения (каждой директории с данными для отправки), в StorageDistributed создаётся отдельный поток,
* который следит за директорией и отправляет данные. */
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
@ -23,49 +32,45 @@ public:
{
}
virtual void write(const Block & block) override
void write(const Block & block) override
{
if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1)
return writeSplit(block, block);
return writeSplit(block);
writeImpl(block);
}
private:
void writeSplit(const Block & block, Block block_with_key)
void writeSplit(const Block & block)
{
auto block_with_key = block;
storage.getShardingKeyExpr()->execute(block_with_key);
const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column;
const auto total_weight = storage.cluster.slot_to_shard.size();
/// shard => block mapping
std::unordered_map<size_t, Block> target_blocks;
/// return iterator to target block, creating one if necessary
const auto get_target_block = [&target_blocks, &block] (const size_t idx) {
const auto it = target_blocks.find(idx);
std::vector<std::unique_ptr<Block>> target_blocks(storage.cluster.shard_info_vec.size());
if (it == std::end(target_blocks))
return target_blocks.emplace(idx, block.cloneEmpty()).first;
const auto num_cols = block.columns();
std::vector<const IColumn*> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.getByPosition(i).column;
return it;
};
for (size_t row = 0; row < block.rowsInFirstColumn(); ++row)
for (size_t num_rows = block.rowsInFirstColumn(), row = 0; row < num_rows; ++row)
{
const auto target_block_idx = storage.cluster.slot_to_shard[key_column->get64(row) % total_weight];
auto & target_block = get_target_block(target_block_idx)->second;;
auto & target_block = target_blocks[target_block_idx];
if (!target_block)
target_block = stdext::make_unique<Block>(block.cloneEmpty());
for (size_t col = 0; col < block.columns(); ++col)
{
target_block.getByPosition(col).column->insertFrom(
*block.getByPosition(col).column, row
);
}
for (size_t col = 0; col < num_cols; ++col)
target_block->getByPosition(col).column->insertFrom(*columns[col], row);
}
for (const auto & shard_block_pair : target_blocks)
writeImpl(shard_block_pair.second, shard_block_pair.first);
for (size_t i = 0; i < target_blocks.size(); ++i)
if (const auto & target_block = target_blocks[i])
writeImpl(*target_block, i);
}
void writeImpl(const Block & block, const size_t shard_id = 0)
@ -74,6 +79,7 @@ private:
if (shard_info.has_local_node)
writeToLocal(block);
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
}
@ -107,7 +113,7 @@ private:
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
const auto & file_name = std::to_string(Increment(path + "increment.txt").get(true)) + ".bin";
const auto & file_name = toString(Increment{path + "increment.txt"}.get(true)) + ".bin";
const auto & block_file_path = path + file_name;
/** on first iteration write block to a temporary directory for subsequent hardlinking to ensure
@ -122,11 +128,11 @@ private:
first_file_tmp_path = block_file_tmp_path;
DB::WriteBufferFromFile out{block_file_tmp_path};
DB::CompressedWriteBuffer compress{out};
DB::NativeBlockOutputStream stream{compress};
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress};
DB::writeStringBinary(query_string, out);
writeStringBinary(query_string, out);
stream.writePrefix();
stream.write(block);
@ -134,7 +140,7 @@ private:
}
if (link(first_file_tmp_path.data(), block_file_path.data()))
throw Exception{"Could not link " + block_file_path + " to " + first_file_tmp_path};
throwFromErrno("Could not link " + block_file_path + " to " + first_file_tmp_path);
}
/** remove the temporary file, enabling the OS to reclaim inode after all threads

View File

@ -62,7 +62,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
virtual BlockOutputStreamPtr write(ASTPtr query) override;
BlockOutputStreamPtr write(ASTPtr query) override;
void drop() override {}
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) { name = new_table_name; }
@ -70,7 +70,7 @@ public:
/// структура подтаблиц не проверяется
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context);
virtual void shutdown() override;
void shutdown() override;
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
@ -93,7 +93,7 @@ private:
void createDirectoryMonitor(const std::string & name);
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors();
/// ensure directory monitor creationg
/// ensure directory monitor creation
void requireDirectoryMonitor(const std::string & name);
String name;

View File

@ -18,18 +18,21 @@
namespace DB
{
namespace {
template <typename ASTType> inline void rewriteImpl(ASTType &, const std::string &, const std::string &) = delete;
namespace
{
template <typename ASTType> void rewriteImpl(ASTType &, const std::string &, const std::string &) = delete;
/// select query has database and table names as AST pointers
template <> inline void rewriteImpl<ASTSelectQuery>(ASTSelectQuery & query, const std::string & database, const std::string & table)
template <> inline void rewriteImpl<ASTSelectQuery>(ASTSelectQuery & query,
const std::string & database, const std::string & table)
{
query.database = new ASTIdentifier{{}, database, ASTIdentifier::Database};
query.table = new ASTIdentifier{{}, table, ASTIdentifier::Table};
}
/// insert query has database and table names as bare strings
template <> inline void rewriteImpl<ASTInsertQuery>(ASTInsertQuery & query, const std::string & database, const std::string & table)
template <> inline void rewriteImpl<ASTInsertQuery>(ASTInsertQuery & query,
const std::string & database, const std::string & table)
{
query.database = database;
query.table = table;
@ -47,8 +50,7 @@ namespace {
/// Меняем имена таблицы и базы данных
rewriteImpl(typeid_cast<ASTType &>(*modified_query_ast), database, table);
/// copy elision and RVO will work as intended, but let's be more explicit
return std::move(modified_query_ast);
return modified_query_ast;
}
}
@ -163,13 +165,15 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query)
{
if (!write_enabled)
throw Exception{
"Method write is not supported by storage " + getName() + " with no sharding key provided",
ErrorCodes::NOT_IMPLEMENTED};
"Method write is not supported by storage " + getName() +
" with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER
};
auto modified_query = rewriteQuery<ASTInsertQuery>(query, remote_database, remote_table);
static_cast<ASTInsertQuery &>(*modified_query).select = nullptr;
return new DistributedBlockOutputStream{*this, modified_query};
return new DistributedBlockOutputStream{
*this,
rewriteQuery<ASTInsertQuery>(query, remote_database, remote_table)
};
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)

View File

@ -177,8 +177,9 @@ StoragePtr StorageFactory::get(
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 3 && args.size() != 4)
throw Exception("Storage Distributed requires 3 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table.",
throw Exception("Storage Distributed requires 3 or 4 parameters"
" - name of configuration section with list of remote servers, name of remote database, name of remote table,"
" sharding key expression (optional).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String cluster_name = typeid_cast<ASTIdentifier &>(*args[0]).name;
@ -188,8 +189,7 @@ StoragePtr StorageFactory::get(
const auto & sharding_key = args.size() == 4 ? args[3] : nullptr;
return StorageDistributed::create(
table_name, columns, remote_database, remote_table, cluster_name, context, sharding_key, data_path
);
table_name, columns, remote_database, remote_table, cluster_name, context, sharding_key, data_path);
}
else if (endsWith(name, "MergeTree"))
{