write to local nodes, refactor StorageDistributed, extract DirectoryMonitor. [#METR-12221]

This commit is contained in:
Andrey Mironov 2014-08-15 13:50:05 +04:00
parent ec0cee0afe
commit 269be93cbe
9 changed files with 334 additions and 148 deletions

View File

@ -64,3 +64,5 @@
#define DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES 35265
#define DBMS_MIN_REVISION_WITH_STRING_QUERY_ID 39002
#define DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES 50264
#define DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS 100

View File

@ -31,8 +31,9 @@ public:
struct ShardInfo
{
std::string dir_name;
std::vector<std::string> dir_names;
int weight;
bool has_local_node;
};
std::vector<ShardInfo> shard_info_vec;
std::vector<size_t> slot_to_shard;
@ -66,9 +67,9 @@ public:
Address(const String & host_port_, const String & user_, const String & password_);
};
static bool addressIsLocal(const Poco::Net::SocketAddress & address);
private:
static bool isLocal(const Address & address);
// private:
/// Массив шардов. Каждый шард - адреса одного сервера.
typedef std::vector<Address> Addresses;

View File

@ -70,6 +70,9 @@ struct Settings
* TODO: Сейчас применяется только при запуске сервера. Можно сделать изменяемым динамически. */ \
M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE) \
\
/** Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown */ \
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS) \
\
M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM) \
\
M(SettingTotalsMode, totals_mode, TotalsMode::BEFORE_HAVING) \

View File

@ -0,0 +1,178 @@
#pragma once
#include <DB/DataStreams/RemoteBlockOutputStream.h>
#include <DB/Common/escapeForFileName.h>
#include <boost/algorithm/string/find_iterator.hpp>
#include <boost/algorithm/string/finder.hpp>
namespace DB
{
namespace
{
template <typename F> ConnectionPools createPoolsForAddresses(const std::string & name, F && f)
{
ConnectionPools pools;
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const auto & address = boost::copy_range<std::string>(*it);
const auto user_pw_end = strchr(address.data(), '@');
const auto colon = strchr(address.data(), ':');
if (!user_pw_end || !colon)
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"};
const auto has_pw = colon < user_pw_end;
const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{"Shard address '" + address + "' does not contain port"};
const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end});
const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{};
const auto host = unescapeForFileName({user_pw_end + 1, host_end});
const auto port = DB::parse<UInt16>(host_end + 1);
pools.emplace_back(f(host, port, user, password));
}
/// just to be explicit
return std::move(pools);
}
}
class DirectoryMonitor
{
public:
DirectoryMonitor(StorageDistributed & storage, const std::string & name)
: storage(storage), pool{createPool(name)}, path{storage.path + name + '/'}
, sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, log{&Logger::get(getLoggerName())}
{
}
void run()
{
while (!storage.quit.load(std::memory_order_relaxed))
{
auto no_work = true;
auto exception = false;
try
{
no_work = !findFiles();
}
catch (...)
{
exception = true;
tryLogCurrentException(getLoggerName().data());
}
if (no_work || exception)
std::this_thread::sleep_for(sleep_time);
}
}
private:
ConnectionPoolPtr createPool(const std::string & name)
{
const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port, const std::string & user, const std::string & password) {
return new ConnectionPool{
1, host, port, "",
user, password, storage.context.getDataTypeFactory(),
storage.getName() + '_' + name
};
};
auto pools = createPoolsForAddresses(name, pool_factory);
return pools.size() == 1 ? pools.front() : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM);
}
bool findFiles()
{
std::map<UInt64, std::string> files;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{path}; it != end; ++it)
{
const auto & file_path_str = it->path();
Poco::Path file_path{file_path_str};
if (!it->isDirectory() && 0 == strncmp(file_path.getExtension().data(), "bin", strlen("bin")))
files[DB::parse<UInt64>(file_path.getBaseName())] = file_path_str;
}
if (files.empty())
return false;
for (const auto & file : files)
{
if (storage.quit.load(std::memory_order_relaxed))
return true;
processFile(file.second);
}
return true;
}
void processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `" << file_path << '`');
auto connection = pool->get();
try
{
DB::ReadBufferFromFile in{file_path};
std::string insert_query;
DB::readStringBinary(insert_query, in);
DB::RemoteBlockOutputStream remote{*connection, insert_query};
remote.writePrefix();
remote.writePrepared(in);
remote.writeSuffix();
}
catch (const Exception & e)
{
const auto code = e.code();
/// mark file as broken if necessary
if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH ||
code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED ||
code == ErrorCodes::CANNOT_READ_ALL_DATA)
{
const auto last_path_separator_pos = file_path.rfind('/');
const auto & path = file_path.substr(0, last_path_separator_pos + 1);
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
const auto & broken_path = path + "broken/";
Poco::File{broken_path}.createDirectory();
Poco::File{file_path}.moveTo(broken_path + file_name);
LOG_ERROR(log, "Moved `" << file_path << "` to broken/ directory");
}
throw;
}
Poco::File{file_path}.remove();
LOG_TRACE(log, "Finished processing `" << file_path << '`');
}
std::string getLoggerName() const {
return storage.name + '.' + storage.getName() + ".DirectoryMonitor";
}
StorageDistributed & storage;
ConnectionPoolPtr pool;
std::string path;
std::chrono::milliseconds sleep_time;
Logger * log;
};
}

View File

@ -1,10 +1,12 @@
#pragma once
#include <DB/Storages/StorageDistributed.h>
#include <DB/Storages/Distributed/queryToString.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <statdaemons/Increment.h>
@ -16,26 +18,26 @@ namespace DB
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, Cluster & cluster, const std::string & query_string)
: storage(storage), cluster(cluster), query_string(query_string)
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast)
: storage(storage), query_ast(query_ast)
{
}
virtual void write(const Block & block) override
{
if (storage.getShardingKeyExpr() && cluster.shard_info_vec.size() > 1)
splitWrite(block, block);
if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1)
writeSplit(block, block);
else
writeImpl(block);
}
private:
void splitWrite(const Block & block, Block block_with_key)
void writeSplit(const Block & block, Block block_with_key)
{
storage.getShardingKeyExpr()->execute(block_with_key);
const auto & key_column = block_with_key.getByName(storage.getShardingKeyColumnName()).column;
const auto total_weight = cluster.slot_to_shard.size();
const auto total_weight = storage.cluster.slot_to_shard.size();
/// shard => block mapping
std::unordered_map<size_t, Block> target_blocks;
@ -51,7 +53,7 @@ private:
for (size_t row = 0; row < block.rows(); ++row)
{
const auto target_block_idx = cluster.slot_to_shard[key_column->get64(row) % total_weight];
const auto target_block_idx = storage.cluster.slot_to_shard[key_column->get64(row) % total_weight];
auto & target_block = get_target_block(target_block_idx)->second;;
for (size_t col = 0; col < block.columns(); ++col)
@ -68,30 +70,76 @@ private:
void writeImpl(const Block & block, const size_t shard_id = 0)
{
const auto & dir_name = cluster.shard_info_vec[shard_id].dir_name;
const auto & path = storage.getPath() + dir_name + '/';
const auto & shard_info = storage.cluster.shard_info_vec[shard_id];
if (shard_info.has_local_node)
writeToLocal(block);
/// ensure shard subdirectory creation and notify storage if necessary
if (Poco::File(path).createDirectory())
storage.createDirectoryMonitor(dir_name);
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
}
const auto number = Increment(path + "increment").get(true);
const auto block_file_path = path + std::to_string(number);
void writeToLocal(const Block & block)
{
InterpreterInsertQuery interp{query_ast, storage.context};
DB::WriteBufferFromFile out{block_file_path};
DB::CompressedWriteBuffer compress{out};
DB::NativeBlockOutputStream stream{compress};
auto block_io = interp.execute();
block_io.out->writePrefix();
block_io.out->write(block);
block_io.out->writeSuffix();
}
DB::writeStringBinary(query_string, out);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
* and keep monitor thread out from reading incomplete data
*/
std::string first_file_tmp_path{};
std::string first_file_path{};
auto first = true;
const auto & query_string = queryToString<ASTInsertQuery>(query_ast);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
/// write first file, hardlink the others
for (const auto & dir_name : dir_names)
{
const auto & path = storage.getPath() + dir_name + '/';
/// ensure shard subdirectory creation and notify storage if necessary
if (Poco::File(path).createDirectory())
storage.createDirectoryMonitor(dir_name);
const auto & file_name = std::to_string(Increment(path + "increment.txt").get(true)) + ".bin";
const auto & block_file_path = path + file_name;
if (first)
{
first = false;
const auto & tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto & block_file_tmp_path = tmp_path + file_name;
first_file_path = block_file_path;
first_file_tmp_path = block_file_tmp_path;
DB::WriteBufferFromFile out{block_file_tmp_path};
DB::CompressedWriteBuffer compress{out};
DB::NativeBlockOutputStream stream{compress};
DB::writeStringBinary(query_string, out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
else if (link(first_file_tmp_path.data(), block_file_path.data()))
throw Exception{"Could not link " + block_file_path + " to " + first_file_tmp_path};
}
Poco::File(first_file_tmp_path).renameTo(first_file_path);
}
StorageDistributed & storage;
Cluster & cluster;
std::string query_string;
ASTPtr query_ast;
};
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <DB/Parsers/formatAST.h>
namespace DB
{
template <typename ASTType>
inline std::string queryToString(const ASTPtr & query)
{
const auto & query_ast = typeid_cast<const ASTType &>(*query);
std::ostringstream s;
formatAST(query_ast, s, 0, false, true);
return s.str();
}
}

View File

@ -18,6 +18,9 @@ namespace DB
*/
class StorageDistributed : public IStorage
{
friend class DistributedBlockOutputStream;
friend class DirectoryMonitor;
public:
static StoragePtr create(
const std::string & name_, /// Имя таблицы.
@ -73,8 +76,6 @@ public:
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
const String & getPath() const { return path; }
/// create directory monitor thread by subdirectory name
void createDirectoryMonitor(const std::string & name);
private:
StorageDistributed(
@ -83,20 +84,22 @@ private:
const String & remote_database_,
const String & remote_table_,
Cluster & cluster_,
const Context & context_,
Context & context_,
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
void createDirectoryMonitors();
void directoryMonitorFunc(const std::string & path);
/// create directory monitor thread by subdirectory name
void createDirectoryMonitor(const std::string & name);
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors();
String name;
NamesAndTypesListPtr columns;
String remote_database;
String remote_table;
const Context & context;
Context & context;
/// Временные таблицы, которые необходимо отправить на сервер. Переменная очищается после каждого вызова метода read
/// Для подготовки к отправке нужно использовтаь метод storeExternalTables

View File

@ -79,8 +79,12 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
continue;
addresses.emplace_back(prefix);
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size());
shard_info_vec.push_back({addressToDirName(addresses.back()), weight});
if (const auto is_local = isLocal(addresses.back()))
shard_info_vec.push_back({{}, weight, is_local });
else
shard_info_vec.push_back({{addressToDirName(addresses.back())}, weight, is_local});
}
else if (0 == strncmp(it->c_str(), "shard", strlen("shard")))
{
@ -95,9 +99,14 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
if (weight == 0)
continue;
// const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
std::string dir_name{};
/** in case of internal_replication we will be appending names to
* the first element of vector, therefore we need first element
* created in advance; otherwise we will just .emplace_back
*/
std::vector<std::string> dir_names(internal_replication);
auto has_local_node = false;
auto first = true;
for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt)
@ -110,16 +119,26 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
{
replica_addresses.emplace_back(partial_prefix + *jt);
dir_name += (first ? "" : ",") + addressToDirName(replica_addresses.back());
if (isLocal(replica_addresses.back()))
{
has_local_node = true;
}
else
{
if (internal_replication)
dir_names.front() += (first ? "" : ",") + addressToDirName(replica_addresses.back());
else
dir_names.emplace_back(addressToDirName(replica_addresses.back()));
if (first) first = false;
if (first) first = false;
}
}
else
throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size());
shard_info_vec.push_back({dir_name, weight});
shard_info_vec.push_back({std::move(dir_names), weight, has_local_node});
}
else
throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
@ -138,7 +157,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
bool has_local_replics = false;
for (Addresses::const_iterator jt = it->begin(); jt != it->end(); ++jt)
{
if (addressIsLocal(jt->host_port))
if (isLocal(*jt))
{
has_local_replics = true;
break;
@ -164,7 +183,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
{
for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
{
if (addressIsLocal(it->host_port))
if (isLocal(*it))
{
++local_nodes_num;
}
@ -223,7 +242,7 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan
}
bool Cluster::addressIsLocal(const Poco::Net::SocketAddress & address)
bool Cluster::isLocal(const Address & address)
{
/// Если среди реплик существует такая, что:
/// - её порт совпадает с портом, который слушает сервер;
@ -232,11 +251,11 @@ bool Cluster::addressIsLocal(const Poco::Net::SocketAddress & address)
const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
static auto interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.port() &&
if (clickhouse_port == address.host_port.port() &&
interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
[&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host(); }))
[&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); }))
{
LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.toString() << " will be processed as local.");
LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.host_port.toString() << " will be processed as local.");
return true;
}
return false;

View File

@ -1,15 +1,11 @@
#include <DB/Parsers/formatAST.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/RemoveColumnsBlockInputStream.h>
#include <DB/Storages/StorageDistributed.h>
#include <DB/Storages/VirtualColumnFactory.h>
#include <DB/Storages/Distributed/DistributedBlockOutputStream.h>
#include <Poco/Net/NetworkInterface.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Storages/Distributed/DirectoryMonitor.h>
#include <DB/Storages/Distributed/queryToString.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/Interpreters/InterpreterSelectQuery.h>
@ -17,24 +13,10 @@
#include <DB/Core/Field.h>
#include <boost/algorithm/string/find_iterator.hpp>
#include <boost/algorithm/string/finder.hpp>
namespace DB
{
namespace {
template <typename ASTType>
inline std::string queryToString(const ASTPtr & query)
{
const auto & query_ast = typeid_cast<const ASTType &>(*query);
std::ostringstream s;
formatAST(query_ast, s, 0, false, true);
return s.str();
}
/// select and insert query have different types for database and table, hence two specializations
template <typename ASTType> struct rewrite_traits;
template <> struct rewrite_traits<ASTSelectQuery> { using type = ASTPtr; };
@ -81,7 +63,7 @@ StorageDistributed::StorageDistributed(
const String & remote_database_,
const String & remote_table_,
Cluster & cluster_,
const Context & context_,
Context & context_,
const ASTPtr & sharding_key_,
const String & data_path_)
: name(name_), columns(columns_),
@ -92,8 +74,6 @@ StorageDistributed::StorageDistributed(
write_enabled(cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_),
path(data_path_ + escapeForFileName(name) + '/')
{
std::cout << "table `" << name << "` in " << path << std::endl;
createDirectoryMonitors();
}
@ -194,12 +174,9 @@ BlockOutputStreamPtr StorageDistributed::write(ASTPtr query)
ErrorCodes::NOT_IMPLEMENTED
};
return new DistributedBlockOutputStream{
*this, this->cluster,
queryToString<ASTInsertQuery>(rewriteQuery<ASTInsertQuery>(
query, remote_database, remote_table
))
};
const auto & modified_query = rewriteQuery<ASTInsertQuery>(query, remote_database, remote_table);
return new DistributedBlockOutputStream{*this, modified_query};
}
void StorageDistributed::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
@ -219,9 +196,8 @@ void StorageDistributed::shutdown()
NameAndTypePair StorageDistributed::getColumn(const String & column_name) const
{
auto type = VirtualColumnFactory::tryGetType(column_name);
if (type)
return NameAndTypePair(column_name, type);
if (const auto & type = VirtualColumnFactory::tryGetType(column_name))
return { column_name, type };
return getRealColumn(column_name);
}
@ -231,89 +207,28 @@ bool StorageDistributed::hasColumn(const String & column_name) const
return VirtualColumnFactory::hasColumn(column_name) || hasRealColumn(column_name);
}
void StorageDistributed::createDirectoryMonitors()
{
Poco::File(path).createDirectory();
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it(path); it != end; ++it)
if (it->isDirectory())
createDirectoryMonitor(it.name());
}
void StorageDistributed::createDirectoryMonitor(const std::string & name)
{
if (directory_monitor_threads.count(name))
return;
directory_monitor_threads.emplace(
name,
std::thread{
&StorageDistributed::directoryMonitorFunc, this, name
name,
std::thread{[this, name] {
DirectoryMonitor monitor{*this, name};
monitor.run();
}
);
});
}
void StorageDistributed::directoryMonitorFunc(const std::string & name)
void StorageDistributed::createDirectoryMonitors()
{
const auto & path = this->path + name + '/';
std::cout << "created monitor for directory " << path << std::endl;
Poco::File{path}.createDirectory();
auto is_local = false;
ConnectionPools pools;
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const auto & address = boost::copy_range<std::string>(*it);
const auto user_pw_end = strchr(address.data(), '@');
const auto colon = strchr(address.data(), ':');
if (!user_pw_end || !colon)
throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"};
const auto has_pw = colon < user_pw_end;
const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon;
if (!host_end)
throw Exception{"Shard address '" + address + "' does not contain port"};
const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end});
const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{};
const auto host = unescapeForFileName({user_pw_end + 1, host_end});
const auto port = DB::parse<UInt16>(host_end + 1);
std::cout
<< "\taddress " << host
<< " port " << port
<< " user " << user
<< " password " << password
<< std::endl;
if (Cluster::addressIsLocal({host, port}))
{
is_local = true;
break;
}
pools.emplace_back(new ConnectionPool{
1, host, port, "",
user, password, context.getDataTypeFactory(),
getName() + '_' + name
});
}
std::cout << "local? " << std::boolalpha << is_local << std::endl;
const auto pool = is_local
? (pools.size() == 1
? pools[0]
: new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM)
)
: nullptr;
while (!quit.load(std::memory_order_relaxed))
{
}
std::cout << "exiting monitor for directory " << path << std::endl;
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{path}; it != end; ++it)
if (it->isDirectory())
createDirectoryMonitor(it.name());
}
}