Merge pull request #1031 from yandex/wait-for-insertion-in-distributed

Wait for insertion in distributed
This commit is contained in:
alexey-milovidov 2017-08-11 21:35:13 +03:00 committed by GitHub
commit 9b99dfb53d
18 changed files with 554 additions and 117 deletions

View File

@ -123,6 +123,7 @@
M(DictCacheLockWriteNs) \
M(DictCacheLockReadNs) \
\
M(DistributedSyncInsertionTimeoutExceeded) \
M(DataAfterMergeDiffersFromReplica)

View File

@ -77,6 +77,7 @@ Cluster::Address::Address(Poco::Util::AbstractConfiguration & config, const Stri
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", "");
is_local = isLocal(*this);
}
@ -98,6 +99,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
host_name = host_port_;
port = default_port;
}
is_local = isLocal(*this);
}
@ -193,6 +195,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
{
/// Shard without replicas.
Addresses addresses;
const auto & prefix = config_prefix + key;
const auto weight = config.getInt(prefix + ".weight", default_weight);
@ -204,11 +208,10 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
info.shard_num = current_shard_num;
info.weight = weight;
if (isLocal(address))
if (address.is_local)
info.local_addresses.push_back(address);
else
{
info.dir_names.push_back(address.toStringFull());
ConnectionPoolPtrs pools;
pools.push_back(std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
@ -227,6 +230,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back(info);
addresses_with_failover.push_back(addresses);
}
else if (startsWith(key, "shard"))
{
@ -244,10 +248,8 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
/** in case of internal_replication we will be appending names to
* the first element of vector; otherwise we will just .emplace_back
*/
std::vector<std::string> dir_names{};
/// in case of internal_replication we will be appending names to dir_name_for_internal_replication
std::string dir_name_for_internal_replication;
auto first = true;
for (const auto & replica_key : replica_keys)
@ -261,18 +263,16 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
replica_addresses.back().replica_num = current_replica_num;
++current_replica_num;
if (!isLocal(replica_addresses.back()))
if (!replica_addresses.back().is_local)
{
if (internal_replication)
{
auto dir_name = replica_addresses.back().toStringFull();
if (first)
dir_names.emplace_back(std::move(dir_name));
dir_name_for_internal_replication = dir_name;
else
dir_names.front() += "," + dir_name;
dir_name_for_internal_replication += "," + dir_name;
}
else
dir_names.emplace_back(replica_addresses.back().toStringFull());
if (first) first = false;
}
@ -288,7 +288,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
for (const auto & replica : replica_addresses)
{
if (isLocal(replica))
if (replica.is_local)
shard_local_addresses.push_back(replica);
else
{
@ -311,17 +311,18 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool, internal_replication});
shards_info.push_back({std::move(dir_name_for_internal_replication), current_shard_num, weight,
shard_local_addresses, shard_pool, internal_replication});
}
else
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
if (!addresses_with_failover.empty() && !addresses.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
++current_shard_num;
}
if (addresses_with_failover.empty())
throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
initMisc();
}
@ -409,12 +410,7 @@ void Cluster::calculateHashOfAddresses()
{
std::vector<std::string> elements;
if (!addresses.empty())
{
for (const auto & address : addresses)
elements.push_back(address.host_name + ":" + toString(address.port));
}
else if (!addresses_with_failover.empty())
if (!addresses_with_failover.empty())
{
for (const auto & addresses : addresses_with_failover)
{
@ -453,8 +449,6 @@ std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
Cluster::Cluster(const Cluster & from, size_t index)
: shards_info{from.shards_info[index]}
{
if (!from.addresses.empty())
addresses.emplace_back(from.addresses[index]);
if (!from.addresses_with_failover.empty())
addresses_with_failover.emplace_back(from.addresses_with_failover[index]);

View File

@ -55,6 +55,7 @@ public:
String password;
String default_database; /// this database is selected when no database is specified for Distributed table
UInt32 replica_num;
bool is_local;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
@ -80,8 +81,8 @@ public:
bool hasInternalReplication() const { return has_internal_replication; }
public:
/// Contains names of directories for asynchronous write to StorageDistributed
std::vector<std::string> dir_names;
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
std::string dir_name_for_internal_replication;
/// Number of the shard, the indexation begins with 1
UInt32 shard_num;
UInt32 weight;
@ -94,8 +95,7 @@ public:
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const Addresses & getShardsAddresses() const { return addresses; }
const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; }
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
const ShardInfo & getAnyShardInfo() const
{
@ -144,8 +144,6 @@ private:
/// Non-empty is either addresses or addresses_with_failover.
/// The size and order of the elements in the corresponding array corresponds to shards_info.
/// An array of shards. Each shard is the address of one server.
Addresses addresses;
/// An array of shards. For each shard, an array of replica addresses (servers that are considered identical).
AddressesWithFailover addresses_with_failover;

View File

@ -273,7 +273,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
String cluster_name = query->cluster;
auto cluster = context.getCluster(cluster_name);
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port);
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsAddresses(), host_name, port);
if (!shard_host_num)
{
throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
@ -283,7 +283,7 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
size_t shard_num = shard_host_num->first;
size_t host_num = shard_host_num->second;
const auto & host_address = cluster->getShardsWithFailoverAddresses().at(shard_num).at(host_num);
const auto & host_address = cluster->getShardsAddresses().at(shard_num).at(host_num);
ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
String rewritten_query = queryToString(rewritten_ast);
@ -369,7 +369,7 @@ void DDLWorker::processTaskAlter(
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
Strings replica_names;
for (const auto & address : cluster->getShardsWithFailoverAddresses().at(shard_num))
for (const auto & address : cluster->getShardsAddresses().at(shard_num))
replica_names.emplace_back(address.toString());
std::sort(replica_names.begin(), replica_names.end());
@ -700,7 +700,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context)
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getHostName();
Cluster::AddressesWithFailover shards = cluster->getShardsWithFailoverAddresses();
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
for (const auto & shard : shards)
{
for (const auto & addr : shard)

View File

@ -286,7 +286,15 @@ struct Settings
M(SettingBool, distributed_ddl_allow_replicated_alter, 0) \
/** Limit on max column size in block while reading. Helps to decrease cache misses count. \
* Should be close to L2 cache size. */ \
M(SettingUInt64, preferred_max_column_in_block_size_bytes, 0)
M(SettingUInt64, preferred_max_column_in_block_size_bytes, 0) \
\
/** If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster. \
*/ \
M(SettingBool, insert_distributed_sync, false) \
/** Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. \
* Zero value means no timeout. \
*/ \
M(SettingUInt64, insert_distributed_timeout, 0)
/// Possible limits for query execution.

View File

@ -86,8 +86,8 @@ namespace
}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name)
: storage(storage), pool{createPool(name)}, path{storage.path + name + '/'}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool)
: storage(storage), pool{pool}, path{storage.path + name + '/'}
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
@ -150,11 +150,11 @@ void StorageDistributedDirectoryMonitor::run()
}
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name)
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
{
const auto pool_factory = [this, &name] (const std::string & host, const UInt16 port,
const std::string & user, const std::string & password,
const std::string & default_database)
const auto pool_factory = [&storage, &name] (const std::string & host, const UInt16 port,
const std::string & user, const std::string & password,
const std::string & default_database)
{
return std::make_shared<ConnectionPool>(
1, host, port, default_database,

View File

@ -16,12 +16,13 @@ namespace DB
class StorageDistributedDirectoryMonitor
{
public:
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name);
StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool);
~StorageDistributedDirectoryMonitor();
static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
private:
void run();
ConnectionPoolPtr createPool(const std::string & name);
bool findFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);

View File

@ -1,4 +1,5 @@
#include <Storages/Distributed/DistributedBlockOutputStream.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <Storages/StorageDistributed.h>
#include <Parsers/formatAST.h>
@ -6,36 +7,280 @@
#include <IO/WriteBufferFromFile.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/RemoteBlockOutputStream.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/createBlockSelector.h>
#include <DataTypes/DataTypesNumber.h>
#include <Common/setThreadName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <Poco/DirectoryIterator.h>
#include <memory>
#include <iostream>
#include <future>
#include <condition_variable>
#include <mutex>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
}
namespace ProfileEvents
{
extern const Event DistributedSyncInsertionTimeoutExceeded;
}
namespace DB
{
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_)
: storage(storage), query_ast(query_ast), cluster(cluster_)
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast,
const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_)
: storage(storage), query_ast(query_ast), cluster(cluster_), insert_sync(insert_sync_), insert_timeout(insert_timeout_)
{
}
void DistributedBlockOutputStream::writePrefix()
{
deadline = std::chrono::steady_clock::now() + std::chrono::seconds(insert_timeout);
}
void DistributedBlockOutputStream::write(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplit(block);
if (insert_sync)
writeSync(block);
else
writeAsync(block);
}
writeImpl(block);
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
writeAsyncImpl(block);
++blocks_inserted;
}
ThreadPool::Job DistributedBlockOutputStream::createWritingJob(
WritingJobContext & context, const Block & block, const Cluster::Address & address, size_t shard_id, size_t job_id)
{
auto memory_tracker = current_memory_tracker;
return [this, memory_tracker, & context, & block, & address, shard_id, job_id]()
{
if (!current_memory_tracker)
{
current_memory_tracker = memory_tracker;
setThreadName("DistrOutStrProc");
}
try
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
if (address.is_local)
{
writeToLocal(block, shard_info.getLocalNodeCount());
context.done_local_jobs[job_id] = true;
}
else
{
writeToShardSync(block, shard_info.hasInternalReplication()
? shard_info.dir_name_for_internal_replication
: address.toStringFull());
context.done_remote_jobs[job_id] = true;
}
++context.finished_jobs_count;
context.cond_var.notify_one();
}
catch (...)
{
++context.finished_jobs_count;
context.cond_var.notify_one();
throw;
}
};
}
std::string DistributedBlockOutputStream::getCurrentStateDescription(const WritingJobContext & context)
{
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
String description;
WriteBufferFromString buffer(description);
buffer << "Insertion status:\n";
auto writeDescription = [&buffer](const Cluster::Address & address, size_t shard_id, size_t blocks_wrote)
{
buffer << "Wrote " << blocks_wrote << " blocks on shard " << shard_id << " replica ";
buffer << address.toString() << '\n';
};
const auto addresses_with_failovers = cluster->getShardsAddresses();
size_t remote_job_id = 0;
size_t local_job_id = 0;
for (size_t shard_id : ext::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_id];
/// If hasInternalReplication, than prefer local replica
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
{
for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local)
{
writeDescription(address, shard_id, blocks_inserted + (context.done_remote_jobs[remote_job_id] ? 1 : 0));
++remote_job_id;
if (shard_info.hasInternalReplication())
break;
}
}
if (shard_info.isLocal())
{
const auto & address = shard_info.local_addresses.front();
writeDescription(address, shard_id, blocks_inserted + (context.done_local_jobs[local_job_id] ? 1 : 0));
++local_job_id;
}
}
return description;
}
void DistributedBlockOutputStream::createWritingJobs(WritingJobContext & context, const Blocks & blocks)
{
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
size_t remote_job_id = 0;
size_t local_job_id = 0;
for (size_t shard_id : ext::range(0, blocks.size()))
{
const auto & shard_info = shards_info[shard_id];
/// If hasInternalReplication, than prefer local replica
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
{
for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local)
{
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, remote_job_id));
++remote_job_id;
if (shard_info.hasInternalReplication())
break;
}
}
if (shards_info[shard_id].isLocal())
{
const auto & address = shards_info[shard_id].local_addresses.front();
pool->schedule(createWritingJob(context, blocks[shard_id], address, shard_id, local_job_id));
++local_job_id;
}
}
}
void DistributedBlockOutputStream::calculateJobsCount()
{
remote_jobs_count = 0;
local_jobs_count = 0;
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
for (size_t shard_id : ext::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_id];
/// If hasInternalReplication, than prefer local replica
if (!shard_info.hasInternalReplication() || !shard_info.isLocal())
{
for (const auto & address : addresses_with_failovers[shard_id])
if (!address.is_local)
{
++remote_jobs_count;
if (shard_info.hasInternalReplication())
break;
}
}
local_jobs_count += shard_info.isLocal() ? 1 : 0;
}
}
void DistributedBlockOutputStream::waitForUnfinishedJobs(WritingJobContext & context)
{
std::unique_lock<std::mutex> lock(context.mutex);
size_t jobs_count = remote_jobs_count + local_jobs_count;
auto cond = [& context, jobs_count] { return context.finished_jobs_count == jobs_count; };
if (insert_timeout)
{
if (!context.cond_var.wait_until(lock, deadline, cond))
{
pool->wait();
ProfileEvents::increment(ProfileEvents::DistributedSyncInsertionTimeoutExceeded);
throw Exception("Timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
}
}
else
context.cond_var.wait(lock, cond);
pool->wait();
}
void DistributedBlockOutputStream::writeSync(const Block & block)
{
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
calculateJobsCount();
pool.emplace(remote_jobs_count + local_jobs_count);
}
WritingJobContext context;
context.done_remote_jobs.assign(remote_jobs_count, false);
context.done_local_jobs.assign(local_jobs_count, false);
context.finished_jobs_count = 0;
const Cluster::ShardsInfo & shards_info = cluster->getShardsInfo();
Blocks blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
createWritingJobs(context, blocks);
try
{
waitForUnfinishedJobs(context);
}
catch (Exception & exception)
{
exception.addMessage(getCurrentStateDescription(context));
throw;
}
++blocks_inserted;
}
@ -65,7 +310,7 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
}
void DistributedBlockOutputStream::writeSplit(const Block & block)
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
@ -91,21 +336,39 @@ void DistributedBlockOutputStream::writeSplit(const Block & block)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows())
writeImpl(splitted_blocks[shard_idx], shard_idx);
return splitted_blocks;
}
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id)
void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
{
Blocks splitted_blocks = splitBlock(block);
const size_t num_shards = splitted_blocks.size();
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows())
writeAsyncImpl(splitted_blocks[shard_idx], shard_idx);
++blocks_inserted;
}
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
/// dir_names is empty if shard has only local addresses
if (!shard_info.dir_names.empty())
writeToShard(block, shard_info.dir_names);
if (shard_info.hasInternalReplication())
writeToShard(block, {shard_info.dir_name_for_internal_replication});
else
{
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toStringFull());
writeToShard(block, dir_names);
}
}
@ -123,6 +386,22 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
}
void DistributedBlockOutputStream::writeToShardSync(const Block & block, const std::string & connection_pool_name)
{
auto pool = storage.requireConnectionPool(connection_pool_name);
auto connection = pool->get();
const auto & query_string = queryToString(query_ast);
RemoteBlockOutputStream remote{*connection, query_string};
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
remote.writePrefix();
remote.write(block);
remote.writeSuffix();
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions
@ -177,4 +456,5 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
Poco::File(first_file_tmp_path).remove();
}
}

View File

@ -3,14 +3,25 @@
#include <Parsers/formatAST.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Core/Block.h>
#include <common/ThreadPool.h>
#include <atomic>
#include <memory>
#include <chrono>
#include <experimental/optional>
#include <Interpreters/Cluster.h>
namespace Poco
{
class Logger;
}
namespace DB
{
class StorageDistributed;
/** The write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
/** If insert_sync_ is true, the write is synchronous. Uses insert_timeout_ if it is not zero.
* Otherwise, the write is asynchronous - the data is first written to the local filesystem, and then sent to the remote servers.
* If the Distributed table uses more than one shard, then in order to support the write,
* when creating the table, an additional parameter must be specified for ENGINE - the sharding key.
* Sharding key is an arbitrary expression from the columns. For example, rand() or UserID.
@ -21,25 +32,69 @@ class StorageDistributed;
class DistributedBlockOutputStream : public IBlockOutputStream
{
public:
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_);
DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_, bool insert_sync_, UInt64 insert_timeout_);
void write(const Block & block) override;
void writePrefix() override;
private:
void writeAsync(const Block & block);
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block);
void calculateJobsCount();
struct WritingJobContext
{
/// Remote job per replica.
std::vector<bool> done_remote_jobs;
/// Local job per shard.
std::vector<bool> done_local_jobs;
std::atomic<unsigned> finished_jobs_count;
std::mutex mutex;
std::condition_variable cond_var;
};
ThreadPool::Job createWritingJob(WritingJobContext & context, const Block & block,
const Cluster::Address & address, size_t shard_id, size_t job_id);
void createWritingJobs(WritingJobContext & context, const Blocks & blocks);
void waitForUnfinishedJobs(WritingJobContext & context);
/// Returns the number of blocks was written for each cluster node. Uses during exception handling.
std::string getCurrentStateDescription(const WritingJobContext & context);
IColumn::Selector createSelector(Block block);
void writeSplit(const Block & block);
/// Split block between shards.
Blocks splitBlock(const Block & block);
void writeImpl(const Block & block, const size_t shard_id = 0);
void writeSplitAsync(const Block & block);
void writeAsyncImpl(const Block & block, const size_t shard_id = 0);
/// Increments finished_writings_count after each repeat.
void writeToLocal(const Block & block, const size_t repeats);
void writeToShard(const Block & block, const std::vector<std::string> & dir_names);
/// Performs synchronous insertion to remote node.
void writeToShardSync(const Block & block, const std::string & connection_pool_name);
private:
StorageDistributed & storage;
ASTPtr query_ast;
ClusterPtr cluster;
bool insert_sync;
UInt64 insert_timeout;
size_t blocks_inserted = 0;
std::chrono::steady_clock::time_point deadline;
size_t remote_jobs_count;
size_t local_jobs_count;
std::experimental::optional<ThreadPool> pool;
};
}

View File

@ -1700,17 +1700,7 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster)
if (!cluster.getShardsAddresses().empty())
{
size_t shard_no = 0;
for (const auto & address : cluster.getShardsAddresses())
{
publish_address(address.host_name, shard_no);
publish_address(address.resolved_address.host().toString(), shard_no);
++shard_no;
}
}
else if (!cluster.getShardsWithFailoverAddresses().empty())
{
size_t shard_no = 0;
for (const auto & addresses : cluster.getShardsWithFailoverAddresses())
for (const auto & addresses : cluster.getShardsAddresses())
{
for (const auto & address : addresses)
{

View File

@ -245,7 +245,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr & query, const Setti
/// DistributedBlockOutputStream will not own cluster, but will own ConnectionPools of the cluster
return std::make_shared<DistributedBlockOutputStream>(
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster);
*this, rewriteInsertQuery(query, remote_database, remote_table), cluster, settings.insert_distributed_sync, settings.insert_distributed_timeout);
}
@ -273,7 +273,7 @@ void StorageDistributed::startup()
void StorageDistributed::shutdown()
{
directory_monitors.clear();
cluster_nodes_data.clear();
}
@ -455,13 +455,6 @@ bool StorageDistributed::hasColumn(const String & column_name) const
return VirtualColumnFactory::hasColumn(column_name) || IStorage::hasColumn(column_name);
}
void StorageDistributed::createDirectoryMonitor(const std::string & name)
{
directory_monitors.emplace(name, std::make_unique<StorageDistributedDirectoryMonitor>(*this, name));
}
void StorageDistributed::createDirectoryMonitors()
{
if (path.empty())
@ -473,14 +466,20 @@ void StorageDistributed::createDirectoryMonitors()
boost::filesystem::directory_iterator end;
for (auto it = begin; it != end; ++it)
if (it->status().type() == boost::filesystem::directory_file)
createDirectoryMonitor(it->path().filename().string());
requireDirectoryMonitor(it->path().filename().string());
}
void StorageDistributed::requireDirectoryMonitor(const std::string & name)
{
if (!directory_monitors.count(name))
createDirectoryMonitor(name);
cluster_nodes_data[name].requireDirectoryMonitor(name, *this);
}
ConnectionPoolPtr StorageDistributed::requireConnectionPool(const std::string & name)
{
auto & node_data = cluster_nodes_data[name];
node_data.requireConnectionPool(name, *this);
return node_data.conneciton_pool;
}
size_t StorageDistributed::getShardCount() const
@ -494,4 +493,17 @@ ClusterPtr StorageDistributed::getCluster() const
return (owned_cluster) ? owned_cluster : context.getCluster(cluster_name);
}
void StorageDistributed::ClusterNodeData::requireConnectionPool(const std::string & name, const StorageDistributed & storage)
{
if (!conneciton_pool)
conneciton_pool = StorageDistributedDirectoryMonitor::createPool(name, storage);
}
void StorageDistributed::ClusterNodeData::requireDirectoryMonitor(const std::string & name, StorageDistributed & storage)
{
requireConnectionPool(name, storage);
if (!directory_monitor)
directory_monitor = std::make_unique<StorageDistributedDirectoryMonitor>(storage, name, conneciton_pool);
}
}

View File

@ -116,13 +116,12 @@ private:
const ASTPtr & sharding_key_ = nullptr,
const String & data_path_ = String{});
/// create directory monitor thread by subdirectory name
void createDirectoryMonitor(const std::string & name);
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors();
/// ensure directory monitor creation
/// ensure directory monitor thread creation by subdirectory name
void requireDirectoryMonitor(const std::string & name);
/// ensure connection pool creation and return it
ConnectionPoolPtr requireConnectionPool(const std::string & name);
ClusterPtr getCluster() const;
@ -146,7 +145,17 @@ private:
String sharding_key_column_name;
String path; /// Can be empty if data_path_ is empty. In this case, a directory for the data to be sent is not created.
std::unordered_map<std::string, std::unique_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr conneciton_pool;
/// Creates connection_pool if not exists.
void requireConnectionPool(const std::string & name, const StorageDistributed & storage);
/// Creates directory_monitor if not exists.
void requireDirectoryMonitor(const std::string & name, StorageDistributed & storage);
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;

View File

@ -72,27 +72,10 @@ BlockInputStreams StorageSystemClusters::read(
{
const std::string cluster_name = entry.first;
const ClusterPtr cluster = entry.second;
const auto & addresses = cluster->getShardsAddresses();
const auto & addresses_with_failover = cluster->getShardsWithFailoverAddresses();
const auto & addresses_with_failover = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
if (!addresses.empty())
{
auto it1 = addresses.cbegin();
auto it2 = shards_info.cbegin();
while (it1 != addresses.cend())
{
const auto & address = *it1;
const auto & shard_info = *it2;
updateColumns(cluster_name, shard_info, address);
++it1;
++it2;
}
}
else if (!addresses_with_failover.empty())
if (!addresses_with_failover.empty())
{
auto it1 = addresses_with_failover.cbegin();
auto it2 = shards_info.cbegin();

View File

@ -27,6 +27,14 @@ class Client:
return CommandRequest(command, stdin, timeout)
class QueryTimeoutExceedException(Exception):
pass
class QueryRuntimeException(Exception):
pass
class CommandRequest:
def __init__(self, command, stdin=None, timeout=None):
# Write data to tmp file to avoid PIPEs and execution blocking
@ -60,11 +68,10 @@ class CommandRequest:
stdout = self.stdout_file.read()
stderr = self.stderr_file.read()
if self.process.returncode != 0 or stderr:
raise Exception('Client failed! Return code: {}, stderr: {}'.format(self.process.returncode, stderr))
if self.timer is not None and not self.process_finished_before_timeout:
raise Exception('Client timed out!')
raise QueryTimeoutExceedException('Client timed out!')
if self.process.returncode != 0 or stderr:
raise QueryRuntimeException('Client failed! Return code: {}, stderr: {}'.format(self.process.returncode, stderr))
return stdout

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,84 @@
from contextlib import contextmanager
from helpers.network import PartitionManager
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException, QueryTimeoutExceedException
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'])
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in (node1, node2):
node.query('''
CREATE TABLE local_table(date Date, val UInt64) ENGINE = MergeTree(date, (date, val), 8192);
''')
node1.query('''
CREATE TABLE distributed_table(date Date, val UInt64) ENGINE = Distributed(test_cluster, default, local_table)
''')
yield cluster
finally:
cluster.shutdown()
def test_insertion_sync(started_cluster):
node1.query('''SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers LIMIT 10000''')
assert node2.query("SELECT count() FROM local_table").rstrip() == '10000'
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
INSERT INTO distributed_table SELECT today() - 1 as date, number as val FROM system.numbers LIMIT 10000''')
assert node2.query("SELECT count() FROM local_table").rstrip() == '20000'
"""
def test_insertion_sync_fails_on_error(started_cluster):
with PartitionManager() as pm:
pm.partition_instances(node2, node1, action='REJECT --reject-with tcp-reset')
with pytest.raises(QueryRuntimeException):
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=2)
"""
def test_insertion_sync_fails_with_timeout(started_cluster):
with pytest.raises(QueryRuntimeException):
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=1.5)
def test_insertion_without_sync_ignores_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query('''
SET insert_distributed_sync = 0, insert_distributed_timeout = 1;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=1.5)
def test_insertion_sync_with_disabled_timeout(started_cluster):
with pytest.raises(QueryTimeoutExceedException):
node1.query('''
SET insert_distributed_sync = 1, insert_distributed_timeout = 0;
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=1)
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")

View File

@ -16,10 +16,9 @@
class ThreadPool
{
private:
public:
using Job = std::function<void()>;
public:
/// Size is constant, all threads are created immediately.
ThreadPool(size_t m_size);