Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-08-13 12:20:15 +03:00
commit 1cc50263d8
72 changed files with 1856 additions and 1417 deletions

View File

@ -79,10 +79,17 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const Se
{
return tryGetEntry(pool, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry);
std::vector<Entry> entries;
entries.reserve(results.size());
for (auto & result : results)
entries.emplace_back(std::move(result.entry));
return entries;
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getManyChecked(
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
@ -92,7 +99,7 @@ std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getManyChecked(
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPool::Entry> ConnectionPoolWithFailover::getManyImpl(
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry)

View File

@ -47,16 +47,17 @@ public:
*/
std::vector<Entry> getMany(const Settings * settings, PoolMode pool_mode);
using Base = PoolWithFailoverBase<IConnectionPool>;
using TryResult = Base::TryResult;
/// The same as getMany(), but check that replication delay for table_to_check is acceptable.
/// Delay threshold is taken from settings.
std::vector<Entry> getManyChecked(
std::vector<TryResult> getManyChecked(
const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check);
private:
using Base = PoolWithFailoverBase<IConnectionPool>;
/// Get the values of relevant settings and call Base::getMany()
std::vector<Entry> getManyImpl(
std::vector<TryResult> getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry);

View File

@ -12,64 +12,45 @@ namespace ErrorCodes
}
MultiplexedConnections::MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_), supports_parallel_execution(false)
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler)
: settings(settings_)
{
if (connection_ == nullptr)
throw Exception("Invalid connection specified", ErrorCodes::LOGICAL_ERROR);
active_connection_total_count = 1;
ShardState shard_state;
shard_state.allocated_connection_count = active_connection_total_count;
shard_state.active_connection_count = active_connection_total_count;
shard_states.push_back(shard_state);
connection.setThrottler(throttler);
ReplicaState replica_state;
replica_state.connection_index = 0;
replica_state.shard_state = &shard_states[0];
replica_state.connection = &connection;
replica_states.push_back(replica_state);
fd_to_replica_state_idx.emplace(connection.socket.impl()->sockfd(), 0);
connection_->setThrottler(throttler);
connections.push_back(connection_);
auto res = replica_map.emplace(connections[0]->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
active_connection_count = 1;
}
MultiplexedConnections::MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler, bool append_extra_info)
: settings(settings_)
{
initFromShard(pool_, main_table);
registerShards();
/// If we didn't get any connections from pool and getMany() did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
if (connections.empty())
return;
supports_parallel_execution = active_connection_total_count > 1;
if (append_extra_info)
block_extra_info = std::make_unique<BlockExtraInfo>();
}
MultiplexedConnections::MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
if (pools_.empty())
throw Exception("Pools are not specified", ErrorCodes::LOGICAL_ERROR);
for (auto & pool : pools_)
replica_states.reserve(connections.size());
fd_to_replica_state_idx.reserve(connections.size());
for (size_t i = 0; i < connections.size(); ++i)
{
if (!pool)
throw Exception("Invalid pool specified", ErrorCodes::LOGICAL_ERROR);
initFromShard(*pool, main_table);
Connection * connection = &(*connections[i]);
connection->setThrottler(throttler);
ReplicaState replica_state;
replica_state.pool_entry = std::move(connections[i]);
replica_state.connection = connection;
replica_states.push_back(std::move(replica_state));
fd_to_replica_state_idx.emplace(connection->socket.impl()->sockfd(), i);
}
registerShards();
supports_parallel_execution = active_connection_total_count > 1;
active_connection_count = connections.size();
if (append_extra_info)
block_extra_info = std::make_unique<BlockExtraInfo>();
@ -82,18 +63,19 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
if (!sent_query)
throw Exception("Cannot send external tables data: query not yet sent.", ErrorCodes::LOGICAL_ERROR);
if (data.size() < active_connection_total_count)
if (data.size() != active_connection_count)
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
auto it = data.begin();
for (auto & e : replica_map)
for (ReplicaState & state : replica_states)
{
ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
{
connection->sendExternalTablesData(*it);
++it;
}
}
}
void MultiplexedConnections::sendQuery(
@ -108,54 +90,28 @@ void MultiplexedConnections::sendQuery(
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (supports_parallel_execution)
if (replica_states.size() > 1)
{
if (settings == nullptr)
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
for (size_t i = 0; i < replica_states.size(); ++i)
{
/// Each shard has one address.
auto it = connections.begin();
for (size_t i = 0; i < shard_states.size(); ++i)
{
Connection * connection = *it;
Connection * connection = replica_states[i].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, nullptr, client_info, with_pending_data);
++it;
}
}
else
{
/// Each shard has one or more replicas.
auto it = connections.begin();
for (const auto & shard_state : shard_states)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = shard_state.active_connection_count;
UInt64 offset = 0;
for (size_t i = 0; i < shard_state.allocated_connection_count; ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
query_settings.parallel_replica_offset = offset;
query_settings.parallel_replica_offset = i;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
++offset;
++it;
}
}
}
}
else
{
Connection * connection = connections[0];
Connection * connection = replica_states[0].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, settings, client_info, with_pending_data);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
}
sent_query = true;
@ -187,14 +143,13 @@ void MultiplexedConnections::disconnect()
{
std::lock_guard<std::mutex> lock(cancel_mutex);
for (auto it = replica_map.begin(); it != replica_map.end(); ++it)
for (ReplicaState & state : replica_states)
{
ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
{
connection->disconnect();
invalidateReplica(it);
invalidateReplica(state);
}
}
}
@ -206,10 +161,9 @@ void MultiplexedConnections::sendCancel()
if (!sent_query || cancelled)
throw Exception("Cannot cancel. Either no query sent or already cancelled.", ErrorCodes::LOGICAL_ERROR);
for (const auto & e : replica_map)
for (ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendCancel();
}
@ -243,7 +197,7 @@ Connection::Packet MultiplexedConnections::drain()
case Protocol::Server::Exception:
default:
/// If we receive an exception or an unknown package, we save it.
/// If we receive an exception or an unknown packet, we save it.
res = std::move(packet);
break;
}
@ -262,10 +216,9 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
{
bool is_first = true;
std::ostringstream os;
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
const Connection * connection = connections[state.connection_index];
const Connection * connection = state.connection;
if (connection != nullptr)
{
os << (is_first ? "" : "; ") << connection->getDescription();
@ -276,65 +229,6 @@ std::string MultiplexedConnections::dumpAddressesUnlocked() const
return os.str();
}
void MultiplexedConnections::initFromShard(ConnectionPoolWithFailover & pool, const QualifiedTableName * main_table)
{
std::vector<IConnectionPool::Entry> entries;
if (main_table)
entries = pool.getManyChecked(settings, pool_mode, *main_table);
else
entries = pool.getMany(settings, pool_mode);
/// If getMany() did not allocate connections and did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
if (entries.empty())
return;
ShardState shard_state;
shard_state.allocated_connection_count = entries.size();
shard_state.active_connection_count = entries.size();
active_connection_total_count += shard_state.active_connection_count;
shard_states.push_back(shard_state);
pool_entries.insert(pool_entries.end(), entries.begin(), entries.end());
}
void MultiplexedConnections::registerShards()
{
replica_map.reserve(pool_entries.size());
connections.reserve(pool_entries.size());
size_t offset = 0;
for (auto & shard_state : shard_states)
{
size_t index_begin = offset;
size_t index_end = offset + shard_state.allocated_connection_count;
registerReplicas(index_begin, index_end, shard_state);
offset = index_end;
}
}
void MultiplexedConnections::registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state)
{
for (size_t i = index_begin; i < index_end; ++i)
{
ReplicaState replica_state;
replica_state.connection_index = i;
replica_state.shard_state = &shard_state;
Connection * connection = &*(pool_entries[i]);
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->setThrottler(throttler);
connections.push_back(connection);
auto res = replica_map.emplace(connection->socket.impl()->sockfd(), replica_state);
if (!res.second)
throw Exception("Invalid set of connections", ErrorCodes::LOGICAL_ERROR);
}
}
Connection::Packet MultiplexedConnections::receivePacketUnlocked()
{
if (!sent_query)
@ -342,14 +236,10 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
if (!hasActiveConnections())
throw Exception("No more packets are available.", ErrorCodes::LOGICAL_ERROR);
auto it = getReplicaForReading();
if (it == replica_map.end())
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
ReplicaState & state = it->second;
current_connection = connections[state.connection_index];
ReplicaState & state = getReplicaForReading();
current_connection = state.connection;
if (current_connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
throw Exception("Logical error: no available replica", ErrorCodes::NO_AVAILABLE_REPLICA);
Connection::Packet packet = current_connection->receivePacket();
@ -363,48 +253,32 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
break;
case Protocol::Server::EndOfStream:
invalidateReplica(it);
invalidateReplica(state);
break;
case Protocol::Server::Exception:
default:
current_connection->disconnect();
invalidateReplica(it);
invalidateReplica(state);
break;
}
return packet;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::getReplicaForReading()
MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForReading()
{
ReplicaMap::iterator it;
if (replica_states.size() == 1)
return replica_states[0];
if (supports_parallel_execution)
it = waitForReadEvent();
else
{
it = replica_map.begin();
const ReplicaState & state = it->second;
Connection * connection = connections[state.connection_index];
if (connection == nullptr)
it = replica_map.end();
}
return it;
}
MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForReadEvent()
{
Poco::Net::Socket::SocketList read_list;
read_list.reserve(active_connection_total_count);
read_list.reserve(active_connection_count);
/// First, we check if there are data already in the buffer
/// of at least one connection.
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
read_list.push_back(connection->socket);
}
@ -416,32 +290,28 @@ MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForRead
Poco::Net::Socket::SocketList write_list;
Poco::Net::Socket::SocketList except_list;
for (const auto & e : replica_map)
for (const ReplicaState & state : replica_states)
{
const ReplicaState & state = e.second;
Connection * connection = connections[state.connection_index];
Connection * connection = state.connection;
if (connection != nullptr)
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout);
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
}
auto & socket = read_list[rand() % read_list.size()];
return replica_map.find(socket.impl()->sockfd());
return replica_states[fd_to_replica_state_idx.at(socket.impl()->sockfd())];
}
void MultiplexedConnections::invalidateReplica(MultiplexedConnections::ReplicaMap::iterator it)
void MultiplexedConnections::invalidateReplica(ReplicaState & state)
{
ReplicaState & state = it->second;
ShardState * shard_state = state.shard_state;
connections[state.connection_index] = nullptr;
--shard_state->active_connection_count;
--active_connection_total_count;
state.connection = nullptr;
state.pool_entry = IConnectionPool::Entry();
--active_connection_count;
}
}

View File

@ -10,7 +10,7 @@ namespace DB
{
/** To retrieve data directly from multiple replicas (connections) from one or several shards
/** To retrieve data directly from multiple replicas (connections) from one shard
* within a single thread. As a degenerate case, it can also work with one connection.
* It is assumed that all functions except sendCancel are always executed in one thread.
*
@ -20,23 +20,14 @@ class MultiplexedConnections final : private boost::noncopyable
{
public:
/// Accepts ready connection.
MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
MultiplexedConnections(Connection & connection, const Settings & settings_, const ThrottlerPtr & throttler_);
/** Accepts a pool from which it will be necessary to get one or more connections.
/** Accepts a vector of connections to replicas of one shard already taken from pool.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the get_all_replicas flag is set, all connections are selected.
*/
MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
/** Accepts pools, one for each shard, from which one will need to get one or more connections.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the do_broadcast flag is set, all connections are received.
*/
MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
std::vector<IConnectionPool::Entry> && connections,
const Settings & settings_, const ThrottlerPtr & throttler_, bool append_extra_info);
/// Send all content of external tables to replicas.
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
@ -72,92 +63,50 @@ public:
/// Returns the number of replicas.
/// Without locking, because sendCancel() does not change this number.
size_t size() const { return replica_map.size(); }
size_t size() const { return replica_states.size(); }
/// Check if there are any valid replicas.
/// Without locking, because sendCancel() does not change the state of the replicas.
bool hasActiveConnections() const { return active_connection_total_count > 0; }
bool hasActiveConnections() const { return active_connection_count > 0; }
private:
/// Connections of the 1st shard, then the connections of the 2nd shard, etc.
using Connections = std::vector<Connection *>;
/// The state of the connections of one shard.
struct ShardState
{
/// The number of connections allocated, i.e. replicas for this shard.
size_t allocated_connection_count;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count;
};
/// Description of a single replica.
struct ReplicaState
{
size_t connection_index;
/// The owner of this replica.
ShardState * shard_state;
};
/// Replicas hashed by id of the socket.
using ReplicaMap = std::unordered_map<int, ReplicaState>;
/// The state of each shard.
using ShardStates = std::vector<ShardState>;
private:
void initFromShard(ConnectionPoolWithFailover & pool, const QualifiedTableName * main_table);
void registerShards();
/// Register replicas of one shard.
void registerReplicas(size_t index_begin, size_t index_end, ShardState & shard_state);
/// Internal version of `receivePacket` function without locking.
Connection::Packet receivePacketUnlocked();
/// Internal version of `dumpAddresses` function without locking.
std::string dumpAddressesUnlocked() const;
/// Get a replica where you can read the data.
ReplicaMap::iterator getReplicaForReading();
/// Description of a single replica.
struct ReplicaState
{
Connection * connection = nullptr;
ConnectionPool::Entry pool_entry;
};
/** Check if there are any data that can be read on any of the replicas.
* Returns one such replica if it exists.
*/
ReplicaMap::iterator waitForReadEvent();
/// Get a replica where you can read the data.
ReplicaState & getReplicaForReading();
/// Mark the replica as invalid.
void invalidateReplica(ReplicaMap::iterator it);
void invalidateReplica(ReplicaState & replica_state);
private:
const Settings * settings;
const Settings & settings;
Connections connections;
ReplicaMap replica_map;
ShardStates shard_states;
/// The current number of valid connections to the replicas of this shard.
size_t active_connection_count = 0;
/// If not nullptr, then it is used to restrict network traffic.
ThrottlerPtr throttler;
std::vector<ConnectionPool::Entry> pool_entries;
std::vector<ReplicaState> replica_states;
std::unordered_map<int, size_t> fd_to_replica_state_idx;
/// Connection that received last block.
Connection * current_connection;
Connection * current_connection = nullptr;
/// Information about the last received block, if supported.
std::unique_ptr<BlockExtraInfo> block_extra_info;
/// The current number of valid connections to replicas.
size_t active_connection_total_count = 0;
/// The query is run in parallel on multiple replicas.
bool supports_parallel_execution;
bool sent_query = false;
bool cancelled = false;
PoolMode pool_mode = PoolMode::GET_MANY;
/// A mutex for the sendCancel function to execute safely
/// in separate thread.
mutable std::mutex cancel_mutex;

View File

@ -242,9 +242,24 @@ void ExecutionStatus::deserializeText(const std::string & data)
rb >> code >> "\n" >> escape >> message;
}
bool ExecutionStatus::tryDeserializeText(const std::string & data)
{
try
{
deserializeText(data);
}
catch (...)
{
return false;
}
return true;
}
ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_of_message)
{
return ExecutionStatus(getCurrentExceptionCode(), start_of_message + ": " + getCurrentExceptionMessage(false, true));
String msg = (start_of_message.empty() ? "" : (start_of_message + ": ")) + getCurrentExceptionMessage(false, true);
return ExecutionStatus(getCurrentExceptionCode(), msg);
}

View File

@ -107,6 +107,8 @@ struct ExecutionStatus
std::string serializeText() const;
void deserializeText(const std::string & data);
bool tryDeserializeText(const std::string & data);
};

View File

@ -107,7 +107,7 @@ public:
/// Returns at least min_entries and at most max_entries connections (at most one connection per nested pool).
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
std::vector<Entry> getMany(
std::vector<TryResult> getMany(
size_t min_entries, size_t max_entries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority = GetPriorityFunc(),
@ -141,16 +141,16 @@ template<typename TNestedPool>
typename TNestedPool::Entry
PoolWithFailoverBase<TNestedPool>::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority)
{
std::vector<Entry> entries = getMany(1, 1, try_get_entry, get_priority);
if (entries.empty() || entries[0].isNull())
std::vector<TryResult> results = getMany(1, 1, try_get_entry, get_priority);
if (results.empty() || results[0].entry.isNull())
throw DB::Exception(
"PoolWithFailoverBase::getMany() returned less than min_entries entries.",
DB::ErrorCodes::LOGICAL_ERROR);
return entries[0];
return results[0].entry;
}
template<typename TNestedPool>
std::vector<typename TNestedPool::Entry>
std::vector<typename PoolWithFailoverBase<TNestedPool>::TryResult>
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries,
const TryGetEntryFunc & try_get_entry,
@ -262,22 +262,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
[](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),
try_results.end());
std::vector<Entry> entries;
if (up_to_date_count >= min_entries)
{
/// There is enough up-to-date entries.
entries.reserve(up_to_date_count);
for (const TryResult & result: try_results)
{
if (result.is_up_to_date)
entries.push_back(result.entry);
}
}
else if (fallback_to_stale_replicas)
{
/// There is not enough up-to-date entries but we are allowed to return stale entries.
/// Gather all up-to-date ones and least-bad stale ones.
/// Sort so that preferred items are near the beginning.
std::stable_sort(
try_results.begin(), try_results.end(),
[](const TryResult & left, const TryResult & right)
@ -286,10 +271,18 @@ PoolWithFailoverBase<TNestedPool>::getMany(
< std::forward_as_tuple(!right.is_up_to_date, right.staleness);
});
if (up_to_date_count >= min_entries)
{
/// There is enough up-to-date entries.
try_results.resize(up_to_date_count);
}
else if (fallback_to_stale_replicas)
{
/// There is not enough up-to-date entries but we are allowed to return stale entries.
/// Gather all up-to-date ones and least-bad stale ones.
size_t size = std::min(try_results.size(), max_entries);
entries.reserve(size);
for (size_t i = 0; i < size; ++i)
entries.push_back(try_results[i].entry);
try_results.resize(size);
}
else
throw DB::Exception(
@ -297,7 +290,7 @@ PoolWithFailoverBase<TNestedPool>::getMany(
+ ", needed: " + std::to_string(min_entries),
DB::ErrorCodes::ALL_REPLICAS_ARE_STALE);
return entries;
return try_results;
}
template<typename TNestedPool>

View File

@ -138,3 +138,8 @@ void Lock::unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_loc
}
}
void Lock::unlockAssumeLockNodeRemovedManually()
{
locked.reset(nullptr);
}

View File

@ -60,6 +60,7 @@ namespace zkutil
void unlock();
void unlockOrMoveIfFailed(std::vector<zkutil::Lock> & failed_to_unlock_locks);
void unlockAssumeLockNodeRemovedManually();
bool tryLock();

View File

@ -95,7 +95,7 @@ public:
/// Throw an exception if something went wrong.
std::string create(const std::string & path, const std::string & data, int32_t mode);
/// Doesn not throw in the following cases:
/// Does not throw in the following cases:
/// * The parent for the created node does not exist
/// * The parent is ephemeral.
/// * The node already exists.

View File

@ -381,6 +381,7 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_UUID = 376;
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE = 377;
extern const int DATA_TYPE_CANNOT_HAVE_ARGUMENTS = 378;
extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK = 379;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -16,17 +16,31 @@ public:
using Generator = std::function<BlockInputStreamPtr()>;
LazyBlockInputStream(Generator generator_)
: generator(generator_) {}
: generator(std::move(generator_))
{
}
String getName() const override { return "Lazy"; }
LazyBlockInputStream(const char * name_, Generator generator_)
: name(name_)
, generator(std::move(generator_))
{
}
String getName() const override { return name; }
String getID() const override
{
std::stringstream res;
res << "Lazy(" << this << ")";
res << name << "(" << this << ")";
return res.str();
}
void cancel() override
{
std::lock_guard<std::mutex> lock(cancel_mutex);
IProfilingBlockInputStream::cancel();
}
protected:
Block readImpl() override
{
@ -37,9 +51,9 @@ protected:
if (!input)
return Block();
children.push_back(input);
auto * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get());
if (IProfilingBlockInputStream * p_input = dynamic_cast<IProfilingBlockInputStream *>(input.get()))
if (p_input)
{
/// They could have been set before, but were not passed into the `input`.
if (progress_callback)
@ -47,14 +61,29 @@ protected:
if (process_list_elem)
p_input->setProcessListElement(process_list_elem);
}
input->readPrefix();
{
std::lock_guard<std::mutex> lock(cancel_mutex);
children.push_back(input);
if (isCancelled() && p_input)
p_input->cancel();
}
}
return input->read();
}
private:
const char * name = "Lazy";
Generator generator;
BlockInputStreamPtr input;
std::mutex cancel_mutex;
};
}

View File

@ -16,31 +16,64 @@ namespace ErrorCodes
}
RemoteBlockInputStream::RemoteBlockInputStream(Connection & connection_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, &connection, throttler]()
{
return std::make_unique<MultiplexedConnections>(connection, context.getSettingsRef(), throttler);
};
}
RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, connections, throttler]() mutable
{
return std::make_unique<MultiplexedConnections>(
std::move(connections), context.getSettingsRef(), throttler, append_extra_info);
};
}
RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: pools(std::move(pools_)), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings,
const ThrottlerPtr & throttler, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query(query_), context(context_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
create_multiplexed_connections = [this, pool, throttler]()
{
const Settings & settings = context.getSettingsRef();
std::vector<IConnectionPool::Entry> connections;
if (main_table)
{
auto try_results = pool->getManyChecked(&settings, pool_mode, main_table.value());
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
}
else
connections = pool->getMany(&settings, pool_mode);
return std::make_unique<MultiplexedConnections>(
std::move(connections), settings, throttler, append_extra_info);
};
}
RemoteBlockInputStream::~RemoteBlockInputStream()
@ -222,39 +255,9 @@ void RemoteBlockInputStream::readSuffixImpl()
}
}
void RemoteBlockInputStream::createMultiplexedConnections()
{
Settings * multiplexed_connections_settings = send_settings ? &context.getSettingsRef() : nullptr;
const QualifiedTableName * main_table_ptr = main_table ? &main_table.value() : nullptr;
if (connection != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
connection, multiplexed_connections_settings, throttler);
else if (pool != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
*pool, multiplexed_connections_settings, throttler,
append_extra_info, pool_mode, main_table_ptr);
else if (!pools.empty())
multiplexed_connections = std::make_unique<MultiplexedConnections>(
pools, multiplexed_connections_settings, throttler,
append_extra_info, pool_mode, main_table_ptr);
else
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
void RemoteBlockInputStream::init(const Settings * settings)
{
if (settings)
{
send_settings = true;
context.setSettings(*settings);
}
else
send_settings = false;
}
void RemoteBlockInputStream::sendQuery()
{
createMultiplexedConnections();
multiplexed_connections = create_multiplexed_connections();
if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size())
return;

View File

@ -20,19 +20,28 @@ namespace DB
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Takes already set connection
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
/// Takes already set connection.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it
RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
/// Accepts several connections already taken from pool.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool for each shard and gets one or several connections from it
RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
/// Takes a pool and gets one or several connections from it.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Context & context_, const Settings * settings = nullptr,
const ThrottlerPtr & throttler = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
~RemoteBlockInputStream() override;
@ -77,9 +86,6 @@ protected:
void readSuffixImpl() override;
/// Creates an object to talk to one shard's replicas performing query
void createMultiplexedConnections();
/// Returns true if query was sent
bool isQueryPending() const;
@ -87,33 +93,21 @@ protected:
bool hasThrownException() const;
private:
void init(const Settings * settings);
void sendQuery();
/// If wasn't sent yet, send request to cancell all connections to replicas
void tryCancel(const char * reason);
private:
/// Already set connection
Connection * connection = nullptr;
/// One shard's connections pool
ConnectionPoolWithFailoverPtr pool = nullptr;
/// Connections pools of one or several shards
ConnectionPoolWithFailoverPtrs pools;
std::function<std::unique_ptr<MultiplexedConnections>()> create_multiplexed_connections;
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
const String query;
bool send_settings;
/// If != nullptr, used to limit network trafic
ThrottlerPtr throttler;
Context context;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
Context context;
/// Threads for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries

View File

@ -351,6 +351,10 @@ void DatabaseOrdinary::renameTable(
to_database_concrete->name,
to_table_name);
}
catch (const Exception & e)
{
throw;
}
catch (const Poco::Exception & e)
{
/// More good diagnostics.

View File

@ -71,7 +71,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
return executeQuery(load_all_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context);
}
@ -101,7 +101,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, query, context);
}
}

View File

@ -5,6 +5,7 @@
#include <Common/StringUtils.h>
#include <IO/HexWriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <openssl/sha.h>
@ -18,6 +19,7 @@ namespace ErrorCodes
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
extern const int SHARD_HAS_NO_CONNECTIONS;
extern const int SYNTAX_ERROR;
}
namespace
@ -113,6 +115,21 @@ String Cluster::Address::toString(const String & host_name, UInt16 port)
return escapeForFileName(host_name) + ':' + DB::toString(port);
}
String Cluster::Address::readableString() const
{
return host_name + ':' + DB::toString(port);
}
void Cluster::Address::fromString(const String & host_port_string, String & host_name, UInt16 & port)
{
auto pos = host_port_string.find_last_of(':');
if (pos == std::string::npos)
throw Exception("Incorrect <host>:<port> format " + host_port_string, ErrorCodes::SYNTAX_ERROR);
host_name = unescapeForFileName(host_port_string.substr(0, pos));
port = parse<UInt16>(host_port_string.substr(pos + 1));
}
String Cluster::Address::toStringFull() const
{

View File

@ -57,14 +57,20 @@ public:
UInt32 replica_num;
bool is_local;
Address() = default;
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
/// Returns escaped 'host_name:port'
/// Returns 'escaped_host_name:port'
String toString() const;
/// Returns 'host_name:port'
String readableString() const;
static String toString(const String & host_name, UInt16 port);
static void fromString(const String & host_port_string, String & host_name, UInt16 & port);
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
String toStringFull() const;
};

View File

@ -1,59 +0,0 @@
#include <Interpreters/ClusterProxy/AlterQueryConstructor.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
{
namespace
{
constexpr PoolMode pool_mode = PoolMode::GET_ONE;
}
namespace ClusterProxy
{
BlockInputStreamPtr AlterQueryConstructor::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
{
/// The ALTER query may be a resharding query that is a part of a distributed
/// job. Since the latter heavily relies on synchronization among its participating
/// nodes, it is very important to defer the execution of a local query so as
/// to prevent any deadlock.
auto interpreter = std::make_shared<InterpreterAlterQuery>(query_ast, context);
auto stream = std::make_shared<LazyBlockInputStream>(
[interpreter]() mutable
{
return interpreter->execute().in;
});
return stream;
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler);
stream->setPoolMode(pool_mode);
return stream;
}
BlockInputStreamPtr AlterQueryConstructor::createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler);
stream->setPoolMode(pool_mode);
return stream;
}
PoolMode AlterQueryConstructor::getPoolMode() const
{
return pool_mode;
}
}
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <Interpreters/ClusterProxy/IQueryConstructor.h>
namespace DB
{
namespace ClusterProxy
{
class AlterQueryConstructor final : public IQueryConstructor
{
public:
AlterQueryConstructor() = default;
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
PoolMode getPoolMode() const override;
};
}
}

View File

@ -0,0 +1,40 @@
#include <Interpreters/ClusterProxy/AlterStreamFactory.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
void AlterStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
if (shard_info.isLocal())
{
/// The ALTER query may be a resharding query that is a part of a distributed
/// job. Since the latter heavily relies on synchronization among its participating
/// nodes, it is very important to defer the execution of a local query so as
/// to prevent any deadlock.
auto interpreter = std::make_shared<InterpreterAlterQuery>(query_ast, context);
res.emplace_back(std::make_shared<LazyBlockInputStream>(
[interpreter]() mutable
{
return interpreter->execute().in;
}));
}
else
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler);
stream->setPoolMode(PoolMode::GET_ONE);
res.emplace_back(std::move(stream));
}
}
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
namespace DB
{
namespace ClusterProxy
{
class AlterStreamFactory final : public IStreamFactory
{
public:
AlterStreamFactory() = default;
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override;
};
}
}

View File

@ -1,73 +0,0 @@
#include <Interpreters/ClusterProxy/DescribeQueryConstructor.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/BlockExtraInfoInputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace
{
constexpr PoolMode pool_mode = PoolMode::GET_ALL;
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.resolved_address.toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;
return block_extra_info;
}
}
namespace ClusterProxy
{
BlockInputStreamPtr DescribeQueryConstructor::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterDescribeQuery interpreter{query_ast, context};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
BlockInputStreamPtr materialized_stream = std::make_shared<MaterializingBlockInputStream>(stream);
return std::make_shared<BlockExtraInfoInputStream>(materialized_stream, toBlockExtraInfo(address));
}
BlockInputStreamPtr DescribeQueryConstructor::createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler);
stream->setPoolMode(pool_mode);
stream->appendExtraInfo();
return stream;
}
BlockInputStreamPtr DescribeQueryConstructor::createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler);
stream->setPoolMode(pool_mode);
stream->appendExtraInfo();
return stream;
}
PoolMode DescribeQueryConstructor::getPoolMode() const
{
return pool_mode;
}
}
}

View File

@ -1,28 +0,0 @@
#pragma once
#include <Interpreters/ClusterProxy/IQueryConstructor.h>
namespace DB
{
namespace ClusterProxy
{
class DescribeQueryConstructor final : public IQueryConstructor
{
public:
DescribeQueryConstructor() = default;
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
PoolMode getPoolMode() const override;
};
}
}

View File

@ -0,0 +1,56 @@
#include <Interpreters/ClusterProxy/DescribeStreamFactory.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/BlockExtraInfoInputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace
{
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.resolved_address.toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;
return block_extra_info;
}
}
namespace ClusterProxy
{
void DescribeStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
for (const Cluster::Address & local_address : shard_info.local_addresses)
{
InterpreterDescribeQuery interpreter{query_ast, context};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
BlockInputStreamPtr materialized_stream = std::make_shared<MaterializingBlockInputStream>(stream);
res.emplace_back(std::make_shared<BlockExtraInfoInputStream>(materialized_stream, toBlockExtraInfo(local_address)));
}
auto remote_stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, context, nullptr, throttler);
remote_stream->setPoolMode(PoolMode::GET_ALL);
remote_stream->appendExtraInfo();
res.emplace_back(std::move(remote_stream));
}
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
namespace DB
{
namespace ClusterProxy
{
class DescribeStreamFactory final : public IStreamFactory
{
public:
DescribeStreamFactory() = default;
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override;
};
}
}

View File

@ -1,42 +0,0 @@
#pragma once
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Client/ConnectionPool.h>
namespace DB
{
struct Settings;
class Context;
class Cluster;
class Throttler;
namespace ClusterProxy
{
/// Base class for the implementation of the details of distributed query
/// execution that are specific to the query type.
class IQueryConstructor
{
public:
virtual ~IQueryConstructor() {}
/// Create an input stream for local query execution.
virtual BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) = 0;
/// Create an input stream for remote query execution on one shard.
virtual BlockInputStreamPtr createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) = 0;
/// Create an input stream for remote query execution on one or more shards.
virtual BlockInputStreamPtr createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & new_settings, ThrottlerPtr throttler, const Context & context) = 0;
/// Specify how we allocate connections on a shard.
virtual PoolMode getPoolMode() const = 0;
};
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Client/ConnectionPool.h>
namespace DB
{
struct Settings;
class Context;
class Cluster;
class Throttler;
namespace ClusterProxy
{
/// Base class for the implementation of the details of distributed query
/// execution that are specific to the query type.
class IStreamFactory
{
public:
virtual ~IStreamFactory() {}
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast, const Context & context,
const ThrottlerPtr & throttler,
BlockInputStreams & res) = 0;
};
}
}

View File

@ -1,138 +0,0 @@
#include <Interpreters/ClusterProxy/Query.h>
#include <Interpreters/ClusterProxy/IQueryConstructor.h>
#include <Interpreters/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
Query::Query(IQueryConstructor & query_constructor_, const ClusterPtr & cluster_,
const ASTPtr & query_ast_, const Context & context_, const Settings & settings_, bool enable_shard_multiplexing_)
: query_constructor{query_constructor_}, cluster{cluster_}, query_ast{query_ast_},
context{context_}, settings{settings_}, enable_shard_multiplexing{enable_shard_multiplexing_}
{
}
BlockInputStreams Query::execute()
{
BlockInputStreams res;
const std::string query = queryToString(query_ast);
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.limits.max_memory_usage_for_user = 0;
/// This setting is really not for user and should not be sent to remote server.
new_settings.limits.max_memory_usage_for_all_queries = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.limits.max_memory_usage_for_user.changed = false;
new_settings.limits.max_memory_usage_for_all_queries.changed = false;
/// Network bandwidth limit, if needed.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
throttler = std::make_shared<Throttler>(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.");
/// Spread shards by threads uniformly.
size_t remote_count = 0;
if (query_constructor.getPoolMode() == PoolMode::GET_ALL)
{
for (const auto & shard_info : cluster->getShardsInfo())
{
if (shard_info.hasRemoteConnections())
++remote_count;
}
}
else
remote_count = cluster->getRemoteShardCount();
size_t thread_count;
if (!enable_shard_multiplexing)
thread_count = remote_count;
else if (remote_count == 0)
thread_count = 0;
else if (settings.max_distributed_processing_threads == 0)
thread_count = 1;
else
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
ConnectionPoolWithFailoverPtrs pools;
/// Loop over shards.
size_t current_thread = 0;
for (const auto & shard_info : cluster->getShardsInfo())
{
bool create_local_queries = shard_info.isLocal();
bool create_remote_queries;
if (query_constructor.getPoolMode() == PoolMode::GET_ALL)
create_remote_queries = shard_info.hasRemoteConnections();
else
create_remote_queries = !create_local_queries;
if (create_local_queries)
{
/// Add queries to localhost (they are processed in-process, without network communication).
Context new_context = context;
new_context.setSettings(new_settings);
for (const auto & address : shard_info.local_addresses)
{
BlockInputStreamPtr stream = query_constructor.createLocal(query_ast, new_context, address);
if (stream)
res.emplace_back(stream);
}
}
if (create_remote_queries)
{
size_t excess = (current_thread < remainder) ? 1 : 0;
size_t actual_pools_per_thread = pools_per_thread + excess;
if (actual_pools_per_thread == 1)
{
res.emplace_back(query_constructor.createRemote(shard_info.pool, query, new_settings, throttler, context));
++current_thread;
}
else
{
pools.push_back(shard_info.pool);
if (pools.size() == actual_pools_per_thread)
{
res.emplace_back(query_constructor.createRemote(std::move(pools), query, new_settings, throttler, context));
pools = ConnectionPoolWithFailoverPtrs();
++current_thread;
}
}
}
}
return res;
}
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
namespace DB
{
struct Settings;
class Context;
class Cluster;
namespace ClusterProxy
{
class IQueryConstructor;
/// This class is designed for distributed queries execution. It hides from
/// the caller the details about the actual locations at which a distributed
/// query is performed. Depending on the type of query to be performed,
/// (currently SELECT, DESCRIBE, or ALTER (for resharding)), a so-called
/// query constructor is specified. Such an object states, among other things,
/// how connections must be allocated for remote execution.
class Query
{
public:
Query(IQueryConstructor & query_constructor_, const ClusterPtr & cluster_,
const ASTPtr & query_ast_, const Context & context_, const Settings & settings_, bool enable_shard_multiplexing_);
/// For each location at which we perform the query, create an input stream
/// from which we can fetch the result.
BlockInputStreams execute();
private:
IQueryConstructor & query_constructor;
ClusterPtr cluster;
ASTPtr query_ast;
const Context & context;
const Settings & settings;
bool enable_shard_multiplexing;
};
}
}

View File

@ -1,68 +0,0 @@
#include <Interpreters/ClusterProxy/SelectQueryConstructor.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
namespace DB
{
namespace
{
constexpr PoolMode pool_mode = PoolMode::GET_MANY;
}
namespace ClusterProxy
{
SelectQueryConstructor::SelectQueryConstructor(
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables_)
: processed_stage{processed_stage_}
, main_table(std::move(main_table_))
, external_tables{external_tables_}
{
}
BlockInputStreamPtr SelectQueryConstructor::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
{
InterpreterSelectQuery interpreter{query_ast, context, processed_stage};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
return std::make_shared<MaterializingBlockInputStream>(stream);
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler, external_tables, processed_stage);
stream->setPoolMode(pool_mode);
stream->setMainTable(main_table);
return stream;
}
BlockInputStreamPtr SelectQueryConstructor::createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context)
{
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler, external_tables, processed_stage);
stream->setPoolMode(pool_mode);
stream->setMainTable(main_table);
return stream;
}
PoolMode SelectQueryConstructor::getPoolMode() const
{
return pool_mode;
}
}
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <Interpreters/ClusterProxy/IQueryConstructor.h>
#include <Core/QueryProcessingStage.h>
#include <Storages/IStorage.h>
namespace DB
{
namespace ClusterProxy
{
class SelectQueryConstructor final : public IQueryConstructor
{
public:
SelectQueryConstructor(
QueryProcessingStage::Enum processed_stage,
QualifiedTableName main_table,
const Tables & external_tables);
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
BlockInputStreamPtr createRemote(
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
BlockInputStreamPtr createRemote(
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
PoolMode getPoolMode() const override;
private:
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
const Tables & external_tables;
};
}
}

View File

@ -0,0 +1,183 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/LazyBlockInputStream.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ALL_REPLICAS_ARE_STALE;
}
namespace ClusterProxy
{
SelectStreamFactory::SelectStreamFactory(
QueryProcessingStage::Enum processed_stage_,
QualifiedTableName main_table_,
const Tables & external_tables_)
: processed_stage{processed_stage_}
, main_table(std::move(main_table_))
, external_tables{external_tables_}
{
}
namespace
{
BlockInputStreamPtr createLocalStream(const ASTPtr & query_ast, const Context & context, QueryProcessingStage::Enum processed_stage)
{
InterpreterSelectQuery interpreter{query_ast, context, processed_stage};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
return std::make_shared<MaterializingBlockInputStream>(stream);
}
}
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(query_ast, context, processed_stage));
};
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
if (shard_info.isLocal())
{
StoragePtr main_table_storage = context.tryGetTable(main_table.database, main_table.table);
if (!main_table_storage) /// Table is absent on a local server.
{
if (shard_info.pool)
{
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"There is no table " << main_table.database << "." << main_table.table
<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
emplace_remote_stream();
return;
}
else
{
/// Let it fail the usual way.
emplace_local_stream();
return;
}
}
const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
if (!replicated_storage)
{
/// Table is not replicated, use local server.
emplace_local_stream();
return;
}
const Settings & settings = context.getSettingsRef();
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay)
{
emplace_local_stream();
return;
}
UInt32 local_delay = replicated_storage->getAbsoluteDelay();
if (local_delay < max_allowed_delay)
{
emplace_local_stream();
return;
}
/// If we reached this point, local replica is stale.
if (!settings.fallback_to_stale_replicas_for_distributed_queries)
{
if (shard_info.pool)
{
/// If we cannot fallback, then we cannot use local replica. Try our luck with remote replicas.
emplace_remote_stream();
return;
}
else
throw Exception(
"Local replica for shard " + toString(shard_info.shard_num)
+ " is stale (delay: " + toString(local_delay) + "), but no other replica configured.",
ErrorCodes::ALL_REPLICAS_ARE_STALE);
}
if (!shard_info.pool)
{
/// There are no remote replicas but we are allowed to fall back to stale local replica.
emplace_local_stream();
return;
}
/// Try our luck with remote replicas, but if they are stale too, then fallback to local replica.
/// Do it lazily to avoid connecting in the main thread.
auto lazily_create_stream = [
pool = shard_info.pool, query, query_ast, context, throttler,
main_table = main_table, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
std::vector<ConnectionPoolWithFailover::TryResult> try_results =
pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table);
double max_remote_delay = 0.0;
for (const auto & try_result : try_results)
{
if (!try_result.is_up_to_date)
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
}
if (local_delay < max_remote_delay)
return createLocalStream(query_ast, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>(
std::move(connections), query, context, nullptr, throttler, external_tables, stage);
}
};
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", lazily_create_stream));
}
else
emplace_remote_stream();
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Core/QueryProcessingStage.h>
#include <Storages/IStorage.h>
namespace DB
{
namespace ClusterProxy
{
class SelectStreamFactory final : public IStreamFactory
{
public:
SelectStreamFactory(
QueryProcessingStage::Enum processed_stage,
QualifiedTableName main_table,
const Tables & external_tables);
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override;
private:
QueryProcessingStage::Enum processed_stage;
QualifiedTableName main_table;
const Tables & external_tables;
};
}
}

View File

@ -0,0 +1,58 @@
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/Settings.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/IInterpreter.h>
#include <Parsers/queryToString.h>
#include <DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace ClusterProxy
{
BlockInputStreams executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings)
{
BlockInputStreams res;
const std::string query = queryToString(query_ast);
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/// Does not matter on remote servers, because queries are sent under different user.
new_settings.max_concurrent_queries_for_user = 0;
new_settings.limits.max_memory_usage_for_user = 0;
/// This setting is really not for user and should not be sent to remote server.
new_settings.limits.max_memory_usage_for_all_queries = 0;
/// Set as unchanged to avoid sending to remote server.
new_settings.max_concurrent_queries_for_user.changed = false;
new_settings.limits.max_memory_usage_for_user.changed = false;
new_settings.limits.max_memory_usage_for_all_queries.changed = false;
Context new_context(context);
new_context.setSettings(new_settings);
/// Network bandwidth limit, if needed.
ThrottlerPtr throttler;
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
throttler = std::make_shared<Throttler>(
settings.limits.max_network_bandwidth,
settings.limits.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.");
for (const auto & shard_info : cluster->getShardsInfo())
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);
return res;
}
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Parsers/IAST.h>
#include <Storages/IStorage.h>
#include <Interpreters/Cluster.h>
namespace DB
{
struct Settings;
class Context;
class Cluster;
namespace ClusterProxy
{
class IStreamFactory;
/// Execute a distributed query, creating a vector of BlockInputStreams, from which the result can be read.
/// `stream_factory` object encapsulates the logic of creating streams for a different type of query
/// (currently SELECT, DESCRIBE, or ALTER (for resharding)).
BlockInputStreams executeQuery(
IStreamFactory & stream_factory, const ClusterPtr & cluster,
const ASTPtr & query_ast, const Context & context, const Settings & settings);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@ namespace DB
class ASTAlterQuery;
struct DDLLogEntry;
struct DDLTask;
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context);
@ -23,70 +24,80 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, Context & context);
class DDLWorker
{
public:
DDLWorker(const std::string & zk_root_dir, Context & context_);
DDLWorker(const std::string & zk_root_dir, Context & context_, const Poco::Util::AbstractConfiguration * config, const String & prefix);
~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
String enqueueQuery(DDLLogEntry & entry);
std::string getHostName() const
/// Host ID (name:port) for logging purposes
/// Note that in each task hosts are identified individually by name:port from initiator server cluster config
std::string getCommonHostID() const
{
return host_id;
return host_fqdn_id;
}
private:
void processTasks();
void processTask(const DDLLogEntry & node, const std::string & node_path);
/// Reads entry and check that the host belongs to host list of the task
/// Returns true and sets current_task if entry parsed and the check is passed
bool initAndCheckTask(const String & entry_name, String & out_reason);
void processTask(DDLTask & task);
void processTaskAlter(
const ASTAlterQuery * query_alter,
DDLTask & task,
const ASTAlterQuery * ast_alter,
const String & rewritten_query,
const std::shared_ptr<Cluster> & cluster,
ssize_t shard_num,
const String & node_path);
/// Checks and cleanups queue's nodes
void cleanupQueue(const Strings * node_names_to_check = nullptr);
void parseQueryAndResolveHost(DDLTask & task);
bool tryExecuteQuery(const String & query, const DDLTask & task, ExecutionStatus & status);
/// Checks and cleanups queue's nodes
void cleanupQueue();
/// Init task node
void createStatusDirs(const std::string & node_name);
ASTPtr getRewrittenQuery(const DDLLogEntry & node);
void run();
private:
Context & context;
Logger * log = &Logger::get("DDLWorker");
std::string host_id; /// host_name:port
std::string host_name;
UInt16 port;
Logger * log;
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
std::string master_dir; /// dir with queries was initiated by the server
/// Used to omit already processed nodes. Maybe usage of set is more obvious.
std::string last_processed_node_name;
/// Name of last task that was skipped or successfully executed
std::string last_processed_task_name;
std::shared_ptr<zkutil::ZooKeeper> zookeeper;
/// Save state of executed task to avoid duplicate execution on ZK error
std::string current_node = {};
bool current_node_was_executed = false;
ExecutionStatus current_node_execution_status;
using DDLTaskPtr = std::unique_ptr<DDLTask>;
DDLTaskPtr current_task;
std::shared_ptr<Poco::Event> event_queue_updated;
std::atomic<bool> stop_flag{false};
std::thread thread;
size_t last_cleanup_time_seconds = 0;
Int64 last_cleanup_time_seconds = 0;
/// Delete node if its age is greater than that
static const size_t node_max_lifetime_seconds;
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
static const size_t cleanup_min_period_seconds;
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
friend class DDLQueryStatusInputSream;
friend class DDLTask;
};

View File

@ -191,9 +191,6 @@ struct Settings
M(SettingUInt64, select_sequential_consistency, 0) \
/** The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function. */ \
M(SettingUInt64, table_function_remote_max_addresses, 1000) \
/** Maximum number of threads for distributed processing of one query */ \
M(SettingUInt64, max_distributed_processing_threads, 8) \
\
/** Settings to reduce the number of threads in case of slow reads. */ \
/** Pay attention only to readings that took at least that much time. */ \
M(SettingMilliseconds, read_backoff_min_latency_ms, 1000) \
@ -294,7 +291,9 @@ struct Settings
/** Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. \
* Zero value means no timeout. \
*/ \
M(SettingUInt64, insert_distributed_timeout, 0)
M(SettingUInt64, insert_distributed_timeout, 0) \
/* Timeout for DDL query responses from all hosts in cluster. Negative value means infinite. */ \
M(SettingInt64, distributed_ddl_task_timeout, 120)
/// Possible limits for query execution.

View File

@ -147,7 +147,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
ast = parseQuery(parser, begin, end, "");
/// Copy query into string. It will be written to log and presented in processlist. If an INSERT query, string will not include data to insertion.
query_size = ast->range.second - ast->range.first;
if (!(begin <= ast->range.first && ast->range.second <= end))
throw Exception("Unexpected behavior: AST chars range is not inside source range", ErrorCodes::LOGICAL_ERROR);
query_size = ast->range.second - begin;
if (max_query_size && query_size > max_query_size)
throw Exception("Query is too large (" + toString(query_size) + ")."

View File

@ -10,7 +10,7 @@ namespace ErrorCodes
extern const int UNEXPECTED_AST_STRUCTURE;
}
ASTAlterQuery::Parameters::Parameters() : type(NO_TYPE) {}
ASTAlterQuery::Parameters::Parameters() {}
void ASTAlterQuery::Parameters::clone(Parameters & p) const
{
@ -42,7 +42,7 @@ void ASTAlterQuery::addParameters(const Parameters & params)
children.push_back(params.primary_key);
}
ASTAlterQuery::ASTAlterQuery(StringRange range_) : IAST(range_)
ASTAlterQuery::ASTAlterQuery(StringRange range_) : ASTQueryWithOutput(range_)
{
}
@ -57,13 +57,14 @@ ASTPtr ASTAlterQuery::clone() const
auto res = std::make_shared<ASTAlterQuery>(*this);
for (ParameterContainer::size_type i = 0; i < parameters.size(); ++i)
parameters[i].clone(res->parameters[i]);
cloneOutputOptions(*res);
return res;
}
ASTPtr ASTAlterQuery::getRewrittenASTWithoutOnCluster(const std::string & new_database) const
{
auto query_ptr = clone();
ASTAlterQuery & query = static_cast<ASTAlterQuery &>(*query_ptr);
auto & query = static_cast<ASTAlterQuery &>(*query_ptr);
query.cluster.clear();
if (query.database.empty())
@ -72,11 +73,11 @@ ASTPtr ASTAlterQuery::getRewrittenASTWithoutOnCluster(const std::string & new_da
return query_ptr;
}
void ASTAlterQuery::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
std::string indent_str = settings.one_line ? "" : std::string(4 * frame.indent, ' ');
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "ALTER TABLE " << (settings.hilite ? hilite_none : "");

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -19,7 +20,7 @@ namespace DB
* [COORDINATE WITH 'coordinator_id']
*/
class ASTAlterQuery : public IAST, public ASTQueryWithOnCluster
class ASTAlterQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
enum ParameterType
@ -96,17 +97,17 @@ public:
void addParameters(const Parameters & params);
ASTAlterQuery(StringRange range_ = StringRange());
explicit ASTAlterQuery(StringRange range_ = StringRange());
/** Get the text that identifies this element. */
String getID() const override;
ASTPtr clone() const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database = {}) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override;
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
@ -10,7 +11,7 @@ namespace DB
/// CREATE TABLE or ATTACH TABLE query
class ASTCreateQuery : public IAST, public ASTQueryWithOnCluster
class ASTCreateQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
bool attach{false}; /// Query ATTACH TABLE, not CREATE TABLE.
@ -29,7 +30,7 @@ public:
ASTPtr select;
ASTCreateQuery() = default;
ASTCreateQuery(const StringRange range_) : IAST(range_) {}
ASTCreateQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
/** Get the text that identifies this element. */
String getID() const override { return (attach ? "AttachQuery_" : "CreateQuery_") + database + "_" + table; };
@ -44,6 +45,8 @@ public:
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); }
cloneOutputOptions(*res);
return res;
}
@ -60,7 +63,7 @@ public:
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
frame.need_parens = false;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
@ -9,7 +10,7 @@ namespace DB
/** DROP query
*/
class ASTDropQuery : public IAST, public ASTQueryWithOnCluster
class ASTDropQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
bool detach{false}; /// DETACH query, not DROP.
@ -18,17 +19,22 @@ public:
String table;
ASTDropQuery() = default;
ASTDropQuery(const StringRange range_) : IAST(range_) {}
explicit ASTDropQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
/** Get the text that identifies this element. */
String getID() const override { return (detach ? "DetachQuery_" : "DropQuery_") + database + "_" + table; };
ASTPtr clone() const override { return std::make_shared<ASTDropQuery>(*this); }
ASTPtr clone() const override
{
auto res = std::make_shared<ASTDropQuery>(*this);
cloneOutputOptions(*res);
return res;
}
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
{
auto query_ptr = clone();
ASTDropQuery & query = static_cast<ASTDropQuery &>(*query_ptr);
auto & query = static_cast<ASTDropQuery &>(*query_ptr);
query.cluster.clear();
if (query.database.empty())
@ -38,7 +44,7 @@ public:
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
if (table.empty() && !database.empty())
{

View File

@ -21,7 +21,7 @@ void ASTQueryWithOutput::formatImpl(const FormatSettings & s, FormatState & stat
{
formatQueryImpl(s, state, frame);
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
std::string indent_str = s.one_line ? "" : std::string(4u * frame.indent, ' ');
if (out_file)
{

View File

@ -15,7 +15,7 @@ public:
ASTPtr format;
ASTQueryWithOutput() = default;
ASTQueryWithOutput(const StringRange range_) : IAST(range_) {}
explicit ASTQueryWithOutput(const StringRange range_) : IAST(range_) {}
protected:
/// NOTE: call this helper at the end of the clone() method of descendant class.

View File

@ -1,6 +1,7 @@
#pragma once
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
@ -9,7 +10,7 @@ namespace DB
/** RENAME query
*/
class ASTRenameQuery : public IAST, public ASTQueryWithOnCluster
class ASTRenameQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
{
public:
struct Table
@ -28,17 +29,22 @@ public:
Elements elements;
ASTRenameQuery() = default;
ASTRenameQuery(const StringRange range_) : IAST(range_) {}
explicit ASTRenameQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
/** Get the text that identifies this element. */
String getID() const override { return "Rename"; };
ASTPtr clone() const override { return std::make_shared<ASTRenameQuery>(*this); }
ASTPtr clone() const override
{
auto res = std::make_shared<ASTRenameQuery>(*this);
cloneOutputOptions(*res);
return res;
}
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database = {}) const override
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
{
auto query_ptr = clone();
ASTRenameQuery & query = static_cast<ASTRenameQuery &>(*query_ptr);
auto & query = static_cast<ASTRenameQuery &>(*query_ptr);
query.cluster.clear();
for (Element & elem : query.elements)
@ -53,7 +59,7 @@ public:
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "RENAME TABLE " << (settings.hilite ? hilite_none : "");

View File

@ -298,6 +298,12 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
/// Optional - a list of columns can be specified. It must fully comply with SELECT.
if (s_lparen.ignore(pos, expected))
{

View File

@ -18,20 +18,13 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p;
ParserInsertQuery insert_p(end);
ParserCreateQuery create_p;
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserAlterQuery alter_p;
ParserUseQuery use_p;
ParserSetQuery set_p;
ParserOptimizeQuery optimize_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| create_p.parse(pos, node, expected)
|| rename_p.parse(pos, node, expected)
|| drop_p.parse(pos, node, expected)
|| alter_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_p.parse(pos, node, expected)
|| optimize_p.parse(pos, node, expected);

View File

@ -4,10 +4,11 @@
#include <Parsers/ParserTablePropertiesQuery.h>
#include <Parsers/ParserShowProcesslistQuery.h>
#include <Parsers/ParserCheckQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserRenameQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/ParserDropQuery.h>
#include <Parsers/ParserKillQueryQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Common/typeid_cast.h>
namespace DB
@ -19,6 +20,10 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ParserSelectQuery select_p;
ParserTablePropertiesQuery table_p;
ParserShowProcesslistQuery show_processlist_p;
ParserCreateQuery create_p;
ParserAlterQuery alter_p;
ParserRenameQuery rename_p;
ParserDropQuery drop_p;
ParserCheckQuery check_p;
ParserKillQueryQuery kill_query_p;
@ -28,6 +33,10 @@ bool ParserQueryWithOutput::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
|| show_tables_p.parse(pos, query, expected)
|| table_p.parse(pos, query, expected)
|| show_processlist_p.parse(pos, query, expected)
|| create_p.parse(pos, query, expected)
|| alter_p.parse(pos, query, expected)
|| rename_p.parse(pos, query, expected)
|| drop_p.parse(pos, query, expected)
|| check_p.parse(pos, query, expected)
|| kill_query_p.parse(pos, query, expected);

View File

@ -296,7 +296,7 @@ private:
void execute(ConnectionPool::Entry & connection, Query & query)
{
Stopwatch watch;
RemoteBlockInputStream stream(*connection, query, &settings, global_context, nullptr, Tables(), query_processing_stage);
RemoteBlockInputStream stream(*connection, query, global_context, &settings, nullptr, Tables(), query_processing_stage);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });

View File

@ -1072,7 +1072,7 @@ private:
statistics.last_query_rows_read = 0;
statistics.last_query_bytes_read = 0;
RemoteBlockInputStream stream(connection, query, &settings, global_context, nullptr, Tables() /*, query_processing_stage*/);
RemoteBlockInputStream stream(connection, query, global_context, &settings);
stream.setProgressCallback([&](const Progress & value)
{

View File

@ -278,7 +278,7 @@ int Server::main(const std::vector<std::string> & args)
{
/// DDL worker should be started after all tables were loaded
String ddl_zookeeper_path = config().getString("distributed_ddl.path", "/clickhouse/task_queue/ddl/");
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context));
global_context->setDDLWorker(std::make_shared<DDLWorker>(ddl_zookeeper_path, *global_context, &config(), "distributed_ddl."));
}
SCOPE_EXIT({

View File

@ -28,10 +28,10 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ClusterProxy/Query.h>
#include <Interpreters/ClusterProxy/SelectQueryConstructor.h>
#include <Interpreters/ClusterProxy/DescribeQueryConstructor.h>
#include <Interpreters/ClusterProxy/AlterQueryConstructor.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/DescribeStreamFactory.h>
#include <Interpreters/ClusterProxy/AlterStreamFactory.h>
#include <Core/Field.h>
@ -213,20 +213,11 @@ BlockInputStreams StorageDistributed::read(
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
external_tables = context.getExternalTables();
/// Disable multiplexing of shards if there is an ORDER BY without GROUP BY.
//const ASTSelectQuery & ast = *(static_cast<const ASTSelectQuery *>(modified_query_ast.get()));
/** The functionality of shard_multiplexing is not completed - turn it off.
* (Because connecting to different shards within a single thread is not done in parallel.)
*/
//bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
bool enable_shard_multiplexing = false;
ClusterProxy::SelectQueryConstructor select_query_constructor(
ClusterProxy::SelectStreamFactory select_stream_factory(
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
return ClusterProxy::Query{select_query_constructor, cluster, modified_query_ast,
context, settings, enable_shard_multiplexing}.execute();
return ClusterProxy::executeQuery(
select_stream_factory, cluster, modified_query_ast, context, settings);
}
@ -351,15 +342,10 @@ void StorageDistributed::reshardPartitions(
resharding_worker.registerQuery(coordinator_id, queryToString(alter_query_ptr));
/** The functionality of shard_multiplexing is not completed - turn it off.
* (Because connecting to different shards within a single thread is not done in parallel.)
*/
bool enable_shard_multiplexing = false;
ClusterProxy::AlterStreamFactory alter_stream_factory;
ClusterProxy::AlterQueryConstructor alter_query_constructor;
BlockInputStreams streams = ClusterProxy::Query{alter_query_constructor, cluster, alter_query_ptr,
context, context.getSettingsRef(), enable_shard_multiplexing}.execute();
BlockInputStreams streams = ClusterProxy::executeQuery(
alter_stream_factory, cluster, alter_query_ptr, context, context.getSettingsRef());
/// This callback is called if an exception has occurred while attempting to read
/// a block from a shard. This is to avoid a potential deadlock if other shards are
@ -429,15 +415,10 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
describe_query.database = remote_database;
describe_query.table = remote_table;
/** The functionality of shard_multiplexing is not completed - turn it off.
* (Because connecting connections to different shards within a single thread is not done in parallel.)
*/
bool enable_shard_multiplexing = false;
ClusterProxy::DescribeStreamFactory describe_stream_factory;
ClusterProxy::DescribeQueryConstructor describe_query_constructor;
return ClusterProxy::Query{describe_query_constructor, cluster, describe_query_ptr,
context, settings, enable_shard_multiplexing}.execute();
return ClusterProxy::executeQuery(
describe_stream_factory, cluster, describe_query_ptr, context, settings);
}

View File

@ -3086,7 +3086,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
leader_address.database,
"", "", "ClickHouse replica");
RemoteBlockInputStream stream(connection, formattedAST(new_query), &settings, context);
RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings);
NullBlockOutputStream output;
copyData(stream, output);

View File

@ -25,7 +25,6 @@ NamesAndTypesList getStructureOfRemoteTable(
{
/// Request for a table description
String query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
Settings settings = context.getSettings();
NamesAndTypesList res;
/// Send to the first any remote shard.
@ -34,9 +33,7 @@ NamesAndTypesList getStructureOfRemoteTable(
if (shard_info.isLocal())
return context.getTable(database, table)->getColumnsList();
BlockInputStreamPtr input = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, &settings, context, nullptr,
Tables(), QueryProcessingStage::Complete);
BlockInputStreamPtr input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context);
input->readPrefix();
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

View File

@ -8,7 +8,7 @@ class Client:
def __init__(self, host, port=9000, command='/usr/bin/clickhouse-client'):
self.host = host
self.port = port
self.command = [command, '--host', self.host, '--port', str(self.port)]
self.command = [command, '--host', self.host, '--port', str(self.port), '--stacktrace']
def query(self, sql, stdin=None, timeout=None):

View File

@ -49,7 +49,8 @@ class ClickHouseCluster:
self.is_up = False
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False, clickhouse_path_dir=None):
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False,
clickhouse_path_dir=None, hostname=None):
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
@ -65,7 +66,10 @@ class ClickHouseCluster:
if name in self.instances:
raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
instance = ClickHouseInstance(self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir)
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
@ -84,6 +88,13 @@ class ClickHouseCluster:
if self.is_up:
return
# Kill unstopped containers from previous launch
try:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
except:
pass
if destroy_dirs and p.exists(self.instances_dir):
print "Removing instances dir", self.instances_dir
shutil.rmtree(self.instances_dir)
@ -128,7 +139,7 @@ version: '2'
services:
{name}:
image: ubuntu:14.04
hostname: {name}
hostname: {hostname}
user: '{uid}'
volumes:
- {binary_path}:/usr/bin/clickhouse:ro
@ -146,12 +157,13 @@ services:
class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir):
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
self.name = name
self.base_cmd = cluster.base_cmd[:]
self.docker_id = cluster.get_instance_docker_id(self.name)
self.cluster = cluster
self.hostname = hostname if hostname is not None else self.name
self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
@ -180,6 +192,16 @@ class ClickHouseInstance:
return self.client.get_query_request(*args, **kwargs)
def exec_in_container(self, cmd, **kwargs):
container = self.get_docker_handle()
handle = self.docker_client.api.exec_create(container.id, cmd, **kwargs)
output = self.docker_client.api.exec_start(handle).decode('utf8')
exit_code = self.docker_client.api.exec_inspect(handle)['ExitCode']
if exit_code:
raise Exception('Cmd {} failed! Return code {}. Output {}'.format(' '.join(cmd), exit_code, output))
return output
def get_docker_handle(self):
return self.docker_client.containers.get(self.docker_id)
@ -294,6 +316,7 @@ class ClickHouseInstance:
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
name=self.name,
hostname=self.hostname,
uid=os.getuid(),
binary_path=self.server_bin_path,
configs_dir=configs_dir,

View File

@ -29,6 +29,7 @@ class PartitionManager:
self._add_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action})
self._add_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
def restore_instance_zk_connections(self, instance, action='DROP'):
self._check_instance(instance)
@ -36,12 +37,18 @@ class PartitionManager:
self._delete_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
def partition_instances(self, left, right, action='DROP'):
def partition_instances(self, left, right, port=None, action='DROP'):
self._check_instance(left)
self._check_instance(right)
self._add_rule({'source': left.ip_address, 'destination': right.ip_address, 'action': action})
self._add_rule({'source': right.ip_address, 'destination': left.ip_address, 'action': action})
def create_rule(src, dst):
rule = {'source': src.ip_address, 'destination': dst.ip_address, 'action': action}
if port is not None:
rule['destination_port'] = port
return rule
self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))
def heal_all(self):
@ -49,6 +56,15 @@ class PartitionManager:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
def pop_rules(self):
res = self._iptables_rules[:]
self.heal_all()
return res
def push_rules(self, rules):
for rule in rules:
self._add_rule(rule)
@staticmethod
def _check_instance(instance):
@ -70,6 +86,18 @@ class PartitionManager:
self.heal_all()
class PartitionManagerDisbaler:
def __init__(self, manager):
self.manager = manager
self.rules = self.manager.pop_rules()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.manager.push_rules(self.rules)
class _NetworkManager:
"""Execute commands inside a container with access to network settings.
@ -103,8 +131,11 @@ class _NetworkManager:
def _iptables_cmd_suffix(
source=None, destination=None,
source_port=None, destination_port=None,
action=None):
ret = ['-p', 'tcp']
action=None, probability=None):
ret = []
if probability is not None:
ret.extend(['-m', 'statistic', '--mode', 'random', '--probability', str(probability)])
ret.extend(['-p', 'tcp'])
if source is not None:
ret.extend(['-s', source])
if destination is not None:

View File

@ -4,11 +4,22 @@
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<host>node_1_1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<host>node_1_2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_2_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_2_2</host>
<port>9000</port>
</replica>
</shard>

View File

@ -7,21 +7,28 @@ from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
instance_with_dist_table = cluster.add_instance('instance_with_dist_table', main_configs=['configs/remote_servers.xml'])
replica1 = cluster.add_instance('replica1', with_zookeeper=True)
replica2 = cluster.add_instance('replica2', with_zookeeper=True)
# Cluster with 2 shards of 2 replicas each. node_1_1 is the instance with Distributed table.
# Thus we have a shard with a local replica and a shard with remote replicas.
node_1_1 = instance_with_dist_table = cluster.add_instance(
'node_1_1', with_zookeeper=True, main_configs=['configs/remote_servers.xml'])
node_1_2 = cluster.add_instance('node_1_2', with_zookeeper=True)
node_2_1 = cluster.add_instance('node_2_1', with_zookeeper=True)
node_2_2 = cluster.add_instance('node_2_2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for replica in (replica1, replica2):
replica.query(
"CREATE TABLE replicated (d Date, x UInt32) ENGINE = "
"ReplicatedMergeTree('/clickhouse/tables/replicated', '{instance}', d, d, 8192)")
for shard in (1, 2):
for replica in (1, 2):
node = cluster.instances['node_{}_{}'.format(shard, replica)]
node.query('''
CREATE TABLE replicated (d Date, x UInt32) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated', '{instance}', d, d, 8192)'''
.format(shard=shard, instance=node.name))
instance_with_dist_table.query(
node_1_1.query(
"CREATE TABLE distributed (d Date, x UInt32) ENGINE = "
"Distributed('test_cluster', 'default', 'replicated')")
@ -33,36 +40,42 @@ def started_cluster():
def test(started_cluster):
with PartitionManager() as pm:
pm.partition_instances(replica1, replica2)
# Hinder replication between replicas of the same shard, but leave the possibility of distributed connection.
pm.partition_instances(node_1_1, node_1_2, port=9009)
pm.partition_instances(node_2_1, node_2_2, port=9009)
replica2.query("INSERT INTO replicated VALUES ('2017-05-08', 1)")
node_1_2.query("INSERT INTO replicated VALUES ('2017-05-08', 1)")
node_2_2.query("INSERT INTO replicated VALUES ('2017-05-08', 2)")
time.sleep(1) # accrue replica delay
assert replica1.query("SELECT count() FROM replicated").strip() == ''
assert replica2.query("SELECT count() FROM replicated").strip() == '1'
assert node_1_1.query("SELECT sum(x) FROM replicated").strip() == ''
assert node_1_2.query("SELECT sum(x) FROM replicated").strip() == '1'
assert node_2_1.query("SELECT sum(x) FROM replicated").strip() == ''
assert node_2_2.query("SELECT sum(x) FROM replicated").strip() == '2'
# With in_order balancing replica1 is chosen.
# With in_order balancing first replicas chosen.
assert instance_with_dist_table.query(
"SELECT count() FROM distributed SETTINGS load_balancing='in_order'").strip() == ''
# When we set max_replica_delay, replica1 must be excluded.
# When we set max_replica_delay, first replicas must be excluded.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
SELECT sum(x) FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
''').strip() == '3'
pm.drop_instance_zk_connections(replica2)
pm.drop_instance_zk_connections(node_1_2)
pm.drop_instance_zk_connections(node_2_2)
time.sleep(4) # allow pings to zookeeper to timeout (must be greater than ZK session timeout).
# At this point all replicas are stale, but the query must still go to replica2 which is the least stale one.
# At this point all replicas are stale, but the query must still go to second replicas which are the least stale ones.
assert instance_with_dist_table.query('''
SELECT count() FROM distributed SETTINGS
SELECT sum(x) FROM distributed SETTINGS
load_balancing='in_order',
max_replica_delay_for_distributed_queries=1
''').strip() == '1'
''').strip() == '3'
# If we forbid stale replicas, the query must fail.
with pytest.raises(Exception):

View File

@ -1,28 +0,0 @@
<yandex>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</yandex>

View File

@ -1,32 +0,0 @@
<yandex>
<remote_servers>
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
</remote_servers>
</yandex>

View File

@ -1,34 +0,0 @@
<yandex>
<remote_servers>
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
</remote_servers>
</yandex>

View File

@ -1,28 +0,0 @@
<yandex>
<remote_servers>
<cluster_without_replication>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_without_replication>
</remote_servers>
</yandex>

View File

@ -0,0 +1,119 @@
<yandex>
<remote_servers>
<!-- Main cluster -->
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
<!-- Cluster with specified default database -->
<cluster2>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
<default_database>default</default_database>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
<default_database>test2</default_database>
</replica>
</shard>
</cluster2>
<!-- Cluster without replicas -->
<cluster_no_replicas>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_no_replicas>
<!-- Cluster without internal replication -->
<cluster_without_replication>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster_without_replication>
</remote_servers>
</yandex>

View File

@ -1,5 +1,8 @@
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
<max_tasks_in_queue>10</max_tasks_in_queue>
<task_max_lifetime>3600</task_max_lifetime>
<cleanup_delay_period>5</cleanup_delay_period>
</distributed_ddl>
</yandex>

View File

@ -9,6 +9,6 @@
<table>query_log</table>
<!-- Interval of flushing data. -->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
</yandex>

View File

@ -2,7 +2,7 @@
<profiles>
<!-- Default profile settings. -->
<default>
<log_queries>0</log_queries>
<log_queries>1</log_queries>
<distributed_ddl_allow_replicated_alter>1</distributed_ddl_allow_replicated_alter>
</default>
</profiles>

View File

@ -4,7 +4,7 @@ import datetime
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.network import PartitionManager, PartitionManagerDisbaler
from helpers.test_tools import TSV
@ -13,13 +13,13 @@ def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
num_hosts = len(cluster.instances)
M = TSV.toMat(tsv_content)
hosts = [l[0] for l in M]
codes = [l[1] for l in M]
messages = [l[2] for l in M]
hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, tsv_content
assert len(set(codes)) == 1, tsv_content
assert codes[0] == "0", tsv_content
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(instance, query, num_hosts=None):
@ -27,26 +27,65 @@ def ddl_check_query(instance, query, num_hosts=None):
check_all_hosts_sucesfully_executed(contents, num_hosts)
return contents
def ddl_check_there_are_no_dublicates(instance):
rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/*ddl_entry=query-%' GROUP BY query)")
assert len(rows) == 0 or rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
for i in xrange(100):
try:
instance.query(query_insert)
return
except Exception as e:
last_exception = e
s = str(e)
if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
raise e
raise last_exception
TEST_REPLICATED_ALTERS=True
cluster = ClickHouseCluster(__file__)
for i in xrange(4):
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()
for inst_name, inst in cluster.instances.items():
clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))
for inst_name in instances_to_replace:
inst = cluster.instances[inst_name]
cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
def init_cluster(cluster):
try:
for i in xrange(4):
cluster.add_instance(
'ch{}'.format(i+1),
config_dir="configs",
macroses={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
# Replace config files for testing ability to set host in DNS and IP formats
replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = cluster.instances['ch4']
cluster.pm_random_drops = PartitionManager()
cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
# Initialize databases and service tables
instance = cluster.instances['ch1']
instance.query("SELECT 1")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
(database String, name String, engine String, metadata_modification_time DateTime)
@ -55,21 +94,39 @@ CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
except Exception as e:
print e
raise
@pytest.fixture(scope="module")
def started_cluster():
try:
init_cluster(cluster)
yield cluster
instance = cluster.instances['ch1']
ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in cluster.instances.values():
ddl_check_there_are_no_dublicates(instance)
finally:
pass
cluster.shutdown()
# Remove iptables rules for sacrifice instance
cluster.pm_random_drops.heal_all()
#cluster.shutdown()
def test_default_database(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster'")
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")
contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
@ -79,6 +136,18 @@ def test_default_database(started_cluster):
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
def test_create_view(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
def test_on_server_fail(started_cluster):
instance = cluster.instances['ch1']
kill_instance = cluster.instances['ch2']
@ -98,7 +167,7 @@ def test_on_server_fail(started_cluster):
contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(cluster, zk_timeout):
@ -132,10 +201,14 @@ def test_replicated_alters(started_cluster):
if not TEST_REPLICATED_ALTERS:
return
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge, i)
@ -147,7 +220,7 @@ ENGINE = Distributed(cluster, default, merge, i)
for i in xrange(4):
k = (i / 2) * 2
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
@ -160,7 +233,7 @@ ENGINE = Distributed(cluster, default, merge, i)
for i in xrange(4):
k = (i / 2) * 2 + 4
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
@ -169,6 +242,10 @@ ENGINE = Distributed(cluster, default, merge, i)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster")
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
@ -201,7 +278,7 @@ ENGINE = Distributed(cluster_without_replication, default, merge, i)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i)")
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))