mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Updated integration tests. Add integration test for DDL. [#CLICKHOUSE-5]
This commit is contained in:
parent
226d84be09
commit
73e2aab9ec
4
.gitignore
vendored
4
.gitignore
vendored
@ -33,6 +33,10 @@ CTestTestfile.cmake
|
||||
*.a
|
||||
*.o
|
||||
|
||||
# Python cache
|
||||
*.pyc
|
||||
__pycache__
|
||||
|
||||
# ignore generated files
|
||||
*-metrika-yandex
|
||||
|
||||
|
@ -41,15 +41,6 @@ inline bool isLocal(const Cluster::Address & address)
|
||||
return address.default_database.empty() && isLocalAddress(address.resolved_address);
|
||||
}
|
||||
|
||||
inline std::string addressToDirName(const Cluster::Address & address)
|
||||
{
|
||||
return
|
||||
escapeForFileName(address.user) +
|
||||
(address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' +
|
||||
escapeForFileName(address.resolved_address.host().toString()) + ':' +
|
||||
std::to_string(address.resolved_address.port()) +
|
||||
(address.default_database.empty() ? "" : ('#' + escapeForFileName(address.default_database)));
|
||||
}
|
||||
|
||||
/// To cache DNS requests.
|
||||
Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
|
||||
@ -109,11 +100,29 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
String Cluster::Address::toString() const
|
||||
{
|
||||
return host_name + ':' + DB::toString(port);
|
||||
return toString(host_name, port);
|
||||
}
|
||||
|
||||
String Cluster::Address::toString(const String & host_name, UInt16 port)
|
||||
{
|
||||
return escapeForFileName(host_name) + ':' + DB::toString(port);
|
||||
}
|
||||
|
||||
|
||||
String Cluster::Address::toStringFull() const
|
||||
{
|
||||
return
|
||||
escapeForFileName(user) +
|
||||
(password.empty() ? "" : (':' + escapeForFileName(password))) + '@' +
|
||||
escapeForFileName(resolved_address.host().toString()) + ':' +
|
||||
std::to_string(resolved_address.port()) +
|
||||
(default_database.empty() ? "" : ('#' + escapeForFileName(default_database)));
|
||||
}
|
||||
|
||||
|
||||
/// Implementation of Clusters class
|
||||
|
||||
Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
|
||||
@ -201,7 +210,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
info.local_addresses.push_back(address);
|
||||
else
|
||||
{
|
||||
info.dir_names.push_back(addressToDirName(address));
|
||||
info.dir_names.push_back(address.toStringFull());
|
||||
ConnectionPoolPtrs pools;
|
||||
pools.push_back(std::make_shared<ConnectionPool>(
|
||||
settings.distributed_connections_pool_size,
|
||||
@ -235,7 +244,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
if (weight == 0)
|
||||
continue;
|
||||
|
||||
const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
|
||||
bool internal_replication = config.getBool(partial_prefix + ".internal_replication", false);
|
||||
|
||||
/** in case of internal_replication we will be appending names to
|
||||
* the first element of vector; otherwise we will just .emplace_back
|
||||
@ -258,14 +267,14 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
{
|
||||
if (internal_replication)
|
||||
{
|
||||
auto dir_name = addressToDirName(replica_addresses.back());
|
||||
auto dir_name = replica_addresses.back().toStringFull();
|
||||
if (first)
|
||||
dir_names.emplace_back(std::move(dir_name));
|
||||
else
|
||||
dir_names.front() += "," + dir_name;
|
||||
}
|
||||
else
|
||||
dir_names.emplace_back(addressToDirName(replica_addresses.back()));
|
||||
dir_names.emplace_back(replica_addresses.back().toStringFull());
|
||||
|
||||
if (first) first = false;
|
||||
}
|
||||
@ -302,7 +311,7 @@ Cluster::Cluster(Poco::Util::AbstractConfiguration & config, const Settings & se
|
||||
std::move(replicas), settings.load_balancing, settings.connections_with_failover_max_tries);
|
||||
|
||||
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());
|
||||
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool});
|
||||
shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool, internal_replication});
|
||||
}
|
||||
else
|
||||
throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
|
||||
|
@ -59,8 +59,13 @@ public:
|
||||
Address(Poco::Util::AbstractConfiguration & config, const String & config_prefix);
|
||||
Address(const String & host_port_, const String & user_, const String & password_);
|
||||
|
||||
/// Returns 'host_name:port'
|
||||
/// Returns escaped 'host_name:port'
|
||||
String toString() const;
|
||||
|
||||
static String toString(const String & host_name, UInt16 port);
|
||||
|
||||
/// Retrurns escaped user:password@resolved_host_address:resolved_host_port#default_database
|
||||
String toStringFull() const;
|
||||
};
|
||||
|
||||
using Addresses = std::vector<Address>;
|
||||
@ -72,7 +77,7 @@ public:
|
||||
bool isLocal() const { return !local_addresses.empty(); }
|
||||
bool hasRemoteConnections() const { return pool != nullptr; }
|
||||
size_t getLocalNodeCount() const { return local_addresses.size(); }
|
||||
bool hasInternalReplication() const { return dir_names.size() == 1; }
|
||||
bool hasInternalReplication() const { return has_internal_replication; }
|
||||
|
||||
public:
|
||||
/// Contains names of directories for asynchronous write to StorageDistributed
|
||||
@ -82,6 +87,7 @@ public:
|
||||
int weight;
|
||||
Addresses local_addresses;
|
||||
ConnectionPoolWithFailoverPtr pool;
|
||||
bool has_internal_replication;
|
||||
};
|
||||
|
||||
using ShardsInfo = std::vector<ShardInfo>;
|
||||
|
@ -32,6 +32,8 @@
|
||||
#include <zkutil/Lock.h>
|
||||
#include <Poco/Timestamp.h>
|
||||
|
||||
#include <experimental/optional>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -52,9 +54,9 @@ struct DDLLogEntry
|
||||
{
|
||||
String query;
|
||||
Strings hosts;
|
||||
String initiator;
|
||||
String initiator; // optional
|
||||
|
||||
static constexpr char CURRENT_VERSION = '1';
|
||||
static constexpr int CURRENT_VERSION = '1';
|
||||
|
||||
String toString()
|
||||
{
|
||||
@ -62,8 +64,7 @@ struct DDLLogEntry
|
||||
{
|
||||
WriteBufferFromString wb(res);
|
||||
|
||||
writeChar(CURRENT_VERSION, wb);
|
||||
wb << "\n";
|
||||
wb << "version: " << CURRENT_VERSION << "\n";
|
||||
wb << "query: " << query << "\n";
|
||||
wb << "hosts: " << hosts << "\n";
|
||||
wb << "initiator: " << initiator << "\n";
|
||||
@ -76,22 +77,27 @@ struct DDLLogEntry
|
||||
{
|
||||
ReadBufferFromString rb(data);
|
||||
|
||||
char version;
|
||||
readChar(version, rb);
|
||||
int version;
|
||||
rb >> "version: " >> version >> "\n";
|
||||
|
||||
if (version != CURRENT_VERSION)
|
||||
throw Exception("Unknown DDLLogEntry format version: " + version, ErrorCodes::UNKNOWN_FORMAT_VERSION);
|
||||
|
||||
rb >> "\n";
|
||||
rb >> "query: " >> query >> "\n";
|
||||
rb >> "hosts: " >> hosts >> "\n";
|
||||
rb >> "initiator: " >> initiator >> "\n";
|
||||
|
||||
if (!rb.eof())
|
||||
rb >> "initiator: " >> initiator >> "\n";
|
||||
else
|
||||
initiator.clear();
|
||||
|
||||
assertEOF(rb);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
static const std::pair<ssize_t, ssize_t> tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
|
||||
using ShardAndHostNum = std::experimental::optional<std::pair<size_t, size_t>>;
|
||||
static ShardAndHostNum tryGetShardAndHostNum(const Cluster::AddressesWithFailover & cluster, const String & host_name, UInt16 port)
|
||||
{
|
||||
for (size_t shard_num = 0; shard_num < cluster.size(); ++shard_num)
|
||||
{
|
||||
@ -99,11 +105,11 @@ static const std::pair<ssize_t, ssize_t> tryGetShardAndHostNum(const Cluster::Ad
|
||||
{
|
||||
const Cluster::Address & address = cluster[shard_num][host_num];
|
||||
if (address.host_name == host_name && address.port == port)
|
||||
return {shard_num, host_num};
|
||||
return std::make_pair(shard_num, host_num);
|
||||
}
|
||||
}
|
||||
|
||||
return {-1, -1};
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
@ -130,7 +136,7 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_)
|
||||
|
||||
host_name = getFQDNOrHostName();
|
||||
port = context.getTCPPort();
|
||||
host_id = host_name + ':' + DB::toString(port);
|
||||
host_id = Cluster::Address::toString(host_name, port);
|
||||
|
||||
event_queue_updated = std::make_shared<Poco::Event>();
|
||||
|
||||
@ -263,14 +269,16 @@ void DDLWorker::processTask(const DDLLogEntry & node, const std::string & node_n
|
||||
String cluster_name = query->cluster;
|
||||
auto cluster = context.getCluster(cluster_name);
|
||||
|
||||
ssize_t shard_num, host_num;
|
||||
std::tie(shard_num, host_num) = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port);
|
||||
if (shard_num < 0 || host_num < 0)
|
||||
auto shard_host_num = tryGetShardAndHostNum(cluster->getShardsWithFailoverAddresses(), host_name, port);
|
||||
if (!shard_host_num)
|
||||
{
|
||||
throw Exception("Cannot find own address (" + host_id + ") in cluster " + cluster_name + " configuration",
|
||||
ErrorCodes::INCONSISTENT_CLUSTER_DEFINITION);
|
||||
}
|
||||
|
||||
size_t shard_num = shard_host_num->first;
|
||||
size_t host_num = shard_host_num->second;
|
||||
|
||||
const auto & host_address = cluster->getShardsWithFailoverAddresses().at(shard_num).at(host_num);
|
||||
ASTPtr rewritten_ast = query->getRewrittenASTWithoutOnCluster(host_address.default_database);
|
||||
String rewritten_query = queryToString(rewritten_ast);
|
||||
@ -353,7 +361,8 @@ void DDLWorker::processTaskAlter(
|
||||
/// current secver aquires lock, executes replicated alter,
|
||||
/// losts zookeeper connection and doesn't have time to create /executed node, second server executes replicated alter again
|
||||
/// To avoid this problem alter() method of replicated tables should be changed and takes into account ddl query id tag.
|
||||
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
|
||||
if (!context.getSettingsRef().distributed_ddl_allow_replicated_alter)
|
||||
throw Exception("Distributed DDL alters don't work properly yet", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
Strings replica_names;
|
||||
for (const auto & address : cluster->getShardsWithFailoverAddresses().at(shard_num))
|
||||
@ -581,7 +590,7 @@ public:
|
||||
|
||||
auto elapsed_seconds = watch.elapsedSeconds();
|
||||
if (elapsed_seconds > timeout_seconds)
|
||||
throw Exception("Watching query is executing too long (" + toString(elapsed_seconds) + " sec.)", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
throw Exception("Watching query is executing too long (" + toString(std::round(elapsed_seconds)) + " sec.)", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
||||
if (num_hosts_finished != 0 || try_number != 0)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(20LU, try_number + 1)));
|
||||
|
@ -275,7 +275,9 @@ struct Settings
|
||||
/** Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. \
|
||||
* If this setting is enabled, the query will be performed anyway, otherwise the error will be reported. \
|
||||
*/ \
|
||||
M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1)
|
||||
M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1) \
|
||||
/** For development and testing purposes only still */ \
|
||||
M(SettingBool, distributed_ddl_allow_replicated_alter, 0)
|
||||
|
||||
|
||||
/// Possible limits for query execution.
|
||||
|
@ -47,14 +47,16 @@ protected:
|
||||
<< (if_exists ? "IF EXISTS " : "")
|
||||
<< (settings.hilite ? hilite_none : "")
|
||||
<< backQuoteIfNeed(database);
|
||||
return;
|
||||
formatOnCluster(settings);
|
||||
}
|
||||
else
|
||||
{
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "")
|
||||
<< (detach ? "DETACH TABLE " : "DROP TABLE ")
|
||||
<< (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "")
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||
formatOnCluster(settings);
|
||||
}
|
||||
|
||||
settings.ostr << (settings.hilite ? hilite_keyword : "")
|
||||
<< (detach ? "DETACH TABLE " : "DROP TABLE ")
|
||||
<< (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "")
|
||||
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
|
||||
formatOnCluster(settings);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -55,6 +55,14 @@ bool ParserDropQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_par
|
||||
|
||||
if (!name_p.parse(pos, end, database, max_parsed_pos, expected))
|
||||
return false;
|
||||
|
||||
ws.ignore(pos, end);
|
||||
|
||||
if (ParserString{"ON", true, true}.ignore(pos, end, max_parsed_pos, expected))
|
||||
{
|
||||
if (!ASTQueryWithOnCluster::parse(pos, end, cluster_str, max_parsed_pos, expected))
|
||||
return false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
2
dbms/tests/integration/.gitignore
vendored
Normal file
2
dbms/tests/integration/.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
.cache
|
||||
_instances
|
@ -8,8 +8,8 @@ Prerequisites:
|
||||
* Ubuntu 14.04 (Trusty).
|
||||
* [docker](https://www.docker.com/community-edition#/download). Minimum required API version: 1.25, check with `docker version`.
|
||||
* [pip](https://pypi.python.org/pypi/pip). To install: `sudo apt-get install python-pip`
|
||||
* [docker-compose](https://docs.docker.com/compose/). To install: `sudo pip install docker-compose`
|
||||
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo pip install pytest`
|
||||
* [py.test](https://docs.pytest.org/) testing framework. To install: `sudo -H pip install pytest`
|
||||
* [docker-compose](https://docs.docker.com/compose/) and additional python libraries. To install: `sudo -H pip install docker-compose docker dicttoxml`
|
||||
|
||||
If you want to run the tests under a non-privileged user, you must add this user to `docker` group: `sudo usermod -aG docker $USER` and re-login.
|
||||
|
||||
@ -28,7 +28,7 @@ To add new test named `foo`, create a directory `test_foo` with an empty `__init
|
||||
named `test.py` containing tests in it. All functions with names starting with `test` will become test cases.
|
||||
|
||||
`helpers` directory contains utilities for:
|
||||
* Launching a ClickHouse cluster with or without ZooKeeper in docker containers.
|
||||
* Launching a ClickHouse cluster with or without ZooKeeper indocker containers.
|
||||
* Sending queries to launched instances.
|
||||
* Introducing network failures such as severing network link between two instances.
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
import errno
|
||||
import subprocess as sp
|
||||
from threading import Timer
|
||||
import tempfile
|
||||
|
||||
|
||||
class Client:
|
||||
@ -9,36 +10,62 @@ class Client:
|
||||
self.port = port
|
||||
self.command = [command, '--host', self.host, '--port', str(self.port)]
|
||||
|
||||
def query(self, sql, stdin=None, timeout=10.0):
|
||||
|
||||
def query(self, sql, stdin=None, timeout=None):
|
||||
return QueryRequest(self, sql, stdin, timeout).get_answer()
|
||||
|
||||
|
||||
def get_query_request(self, sql, stdin=None, timeout=None):
|
||||
return QueryRequest(self, sql, stdin, timeout)
|
||||
|
||||
|
||||
class QueryRequest:
|
||||
def __init__(self, client, sql, stdin=None, timeout=None):
|
||||
self.client = client
|
||||
|
||||
command = self.client.command[:]
|
||||
if stdin is None:
|
||||
command = self.command + ['--multiquery']
|
||||
command += ['--multiquery']
|
||||
stdin = sql
|
||||
else:
|
||||
command = self.command + ['--query', sql]
|
||||
command += ['--query', sql]
|
||||
|
||||
process = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE)
|
||||
# Write data to tmp file to avoid PIPEs and execution blocking
|
||||
stdin_file = tempfile.TemporaryFile()
|
||||
stdin_file.write(stdin)
|
||||
stdin_file.seek(0)
|
||||
self.stdout_file = tempfile.TemporaryFile()
|
||||
self.stderr_file = tempfile.TemporaryFile()
|
||||
|
||||
timer = None
|
||||
#print " ".join(command), "\nQuery:", sql
|
||||
|
||||
self.process = sp.Popen(command, stdin=stdin_file, stdout=self.stdout_file, stderr=self.stderr_file)
|
||||
|
||||
self.timer = None
|
||||
self.process_finished_before_timeout = True
|
||||
if timeout is not None:
|
||||
def kill_process():
|
||||
try:
|
||||
process.kill()
|
||||
except OSError as e:
|
||||
if e.errno != errno.ESRCH:
|
||||
raise
|
||||
if self.process.poll() is None:
|
||||
self.process.kill()
|
||||
self.process_finished_before_timeout = False
|
||||
|
||||
timer = Timer(timeout, kill_process)
|
||||
timer.start()
|
||||
self.timer = Timer(timeout, kill_process)
|
||||
self.timer.start()
|
||||
|
||||
stdout, stderr = process.communicate(stdin)
|
||||
|
||||
if timer is not None:
|
||||
if timer.finished.is_set():
|
||||
raise Exception('Client timed out!')
|
||||
else:
|
||||
timer.cancel()
|
||||
def get_answer(self):
|
||||
self.process.wait()
|
||||
self.stdout_file.seek(0)
|
||||
self.stderr_file.seek(0)
|
||||
|
||||
if process.returncode != 0:
|
||||
raise Exception('Client failed! return code: {}, stderr: {}'.format(process.returncode, stderr))
|
||||
stdout = self.stdout_file.read()
|
||||
stderr = self.stderr_file.read()
|
||||
|
||||
if self.process.returncode != 0 or stderr:
|
||||
raise Exception('Client failed! Return code: {}, stderr: {}'.format(self.process.returncode, stderr))
|
||||
|
||||
if self.timer is not None and not self.process_finished_before_timeout:
|
||||
raise Exception('Client timed out!')
|
||||
|
||||
return stdout
|
||||
|
||||
|
@ -4,9 +4,12 @@ import pwd
|
||||
import re
|
||||
import subprocess
|
||||
import shutil
|
||||
import distutils.dir_util
|
||||
import socket
|
||||
import time
|
||||
import errno
|
||||
from dicttoxml import dicttoxml
|
||||
import xml.dom.minidom
|
||||
|
||||
import docker
|
||||
|
||||
@ -44,11 +47,13 @@ class ClickHouseCluster:
|
||||
self.is_up = False
|
||||
|
||||
|
||||
def add_instance(self, name, custom_configs, with_zookeeper=False):
|
||||
def add_instance(self, name, config_dir=None, main_configs=[], user_configs=[], macroses={}, with_zookeeper=False):
|
||||
"""Add an instance to the cluster.
|
||||
|
||||
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
|
||||
custom_configs - a list of config files that will be added to config.d/ directory
|
||||
config_dir - a directory with config files which content will be copied to /etc/clickhouse-server/ directory
|
||||
main_configs - a list of config files that will be added to config.d/ directory
|
||||
user_configs - a list of config files that will be added to users.d/ directory
|
||||
with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
|
||||
"""
|
||||
|
||||
@ -58,7 +63,7 @@ class ClickHouseCluster:
|
||||
if name in self.instances:
|
||||
raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
|
||||
|
||||
instance = ClickHouseInstance(self.base_dir, name, custom_configs, with_zookeeper, self.base_configs_dir, self.server_bin_path)
|
||||
instance = ClickHouseInstance(self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper, self.base_configs_dir, self.server_bin_path)
|
||||
self.instances[name] = instance
|
||||
self.base_cmd.extend(['--file', instance.docker_compose_path])
|
||||
if with_zookeeper and not self.with_zookeeper:
|
||||
@ -68,6 +73,11 @@ class ClickHouseCluster:
|
||||
return instance
|
||||
|
||||
|
||||
def get_instance_docker_id(self, instance_name):
|
||||
# According to how docker-compose names containers.
|
||||
return self.project_name + '_' + instance_name + '_1'
|
||||
|
||||
|
||||
def start(self, destroy_dirs=True):
|
||||
if self.is_up:
|
||||
return
|
||||
@ -79,13 +89,10 @@ class ClickHouseCluster:
|
||||
|
||||
self.docker_client = docker.from_env()
|
||||
|
||||
start_deadline = time.time() + 10.0 # seconds
|
||||
for instance in self.instances.values():
|
||||
start_deadline = time.time() + 20.0 # seconds
|
||||
for instance in self.instances.itervalues():
|
||||
instance.docker_client = self.docker_client
|
||||
|
||||
# According to how docker-compose names containers.
|
||||
instance.docker_id = self.project_name + '_' + instance.name + '_1'
|
||||
|
||||
container = self.docker_client.containers.get(instance.docker_id)
|
||||
instance.ip_address = container.attrs['NetworkSettings']['Networks'].values()[0]['IPAddress']
|
||||
|
||||
@ -116,6 +123,7 @@ version: '2'
|
||||
services:
|
||||
{name}:
|
||||
image: ubuntu:14.04
|
||||
hostname: {name}
|
||||
user: '{uid}'
|
||||
volumes:
|
||||
- {binary_path}:/usr/bin/clickhouse:ro
|
||||
@ -137,39 +145,72 @@ MACROS_CONFIG_TEMPLATE = '''
|
||||
</yandex>
|
||||
'''
|
||||
|
||||
|
||||
class ClickHouseInstance:
|
||||
def __init__(
|
||||
self, base_path, name, custom_configs, with_zookeeper,
|
||||
base_configs_dir, server_bin_path):
|
||||
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
|
||||
with_zookeeper, base_configs_dir, server_bin_path):
|
||||
|
||||
self.name = name
|
||||
self.custom_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_configs]
|
||||
self.base_cmd = cluster.base_cmd[:]
|
||||
self.docker_id = cluster.get_instance_docker_id(self.name)
|
||||
self.cluster = cluster
|
||||
|
||||
self.custom_config_dir = p.abspath(p.join(base_path, custom_config_dir)) if custom_config_dir else None
|
||||
self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
|
||||
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
|
||||
self.macroses = macroses if macroses is not None else {}
|
||||
self.with_zookeeper = with_zookeeper
|
||||
|
||||
self.base_configs_dir = base_configs_dir
|
||||
self.server_bin_path = server_bin_path
|
||||
|
||||
self.path = p.abspath(p.join(base_path, name))
|
||||
self.path = p.abspath(p.join(base_path, '_instances', name))
|
||||
self.docker_compose_path = p.join(self.path, 'docker_compose.yml')
|
||||
|
||||
self.docker_client = None
|
||||
self.docker_id = None
|
||||
self.ip_address = None
|
||||
self.client = None
|
||||
self.default_timeout = 20.0 # 20 sec
|
||||
|
||||
|
||||
def query(self, sql, stdin=None):
|
||||
return self.client.query(sql, stdin)
|
||||
def query(self, *args, **kwargs):
|
||||
return self.client.query(*args, **kwargs)
|
||||
|
||||
|
||||
def wait_for_start(self, deadline):
|
||||
def get_query_request(self, *args, **kwargs):
|
||||
return self.client.get_query_request(*args, **kwargs)
|
||||
|
||||
|
||||
def get_docker_handle(self):
|
||||
return self.docker_client.containers.get(self.docker_id)
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.get_docker_handle().stop(self.default_timeout)
|
||||
|
||||
|
||||
def start(self):
|
||||
self.get_docker_handle().start()
|
||||
|
||||
|
||||
def wait_for_start(self, deadline=None, timeout=None):
|
||||
start_time = time.time()
|
||||
|
||||
if timeout is not None:
|
||||
deadline = start_time + timeout
|
||||
|
||||
while True:
|
||||
if self.docker_client.containers.get(self.docker_id).status == 'exited':
|
||||
raise Exception("Instance `{}' failed to start".format(self.name))
|
||||
status = self.get_docker_handle().status
|
||||
|
||||
time_left = deadline - time.time()
|
||||
if time_left <= 0:
|
||||
raise Exception("Timed out while waiting for instance `{}' with ip address {} to start".format(self.name, self.ip_address))
|
||||
if status == 'exited':
|
||||
raise Exception("Instance `{}' failed to start. Container status: {}".format(self.name, status))
|
||||
|
||||
current_time = time.time()
|
||||
time_left = deadline - current_time
|
||||
if deadline is not None and current_time >= deadline:
|
||||
raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
|
||||
"Container status: {}".format(self.name, self.ip_address, status))
|
||||
|
||||
# Repeatedly poll the instance address until there is something that listens there.
|
||||
# Usually it means that ClickHouse is ready to accept queries.
|
||||
@ -189,6 +230,12 @@ class ClickHouseInstance:
|
||||
sock.close()
|
||||
|
||||
|
||||
@staticmethod
|
||||
def dict_to_xml(dictionary):
|
||||
xml_str = dicttoxml(dictionary, custom_root="yandex", attr_type=False)
|
||||
return xml.dom.minidom.parseString(xml_str).toprettyxml()
|
||||
|
||||
|
||||
def create_dir(self, destroy_dir=True):
|
||||
"""Create the instance directory and all the needed files there."""
|
||||
|
||||
@ -197,7 +244,7 @@ class ClickHouseInstance:
|
||||
elif p.exists(self.path):
|
||||
return
|
||||
|
||||
os.mkdir(self.path)
|
||||
os.makedirs(self.path)
|
||||
|
||||
configs_dir = p.join(self.path, 'configs')
|
||||
os.mkdir(configs_dir)
|
||||
@ -206,19 +253,33 @@ class ClickHouseInstance:
|
||||
shutil.copy(p.join(self.base_configs_dir, 'users.xml'), configs_dir)
|
||||
|
||||
config_d_dir = p.join(configs_dir, 'config.d')
|
||||
users_d_dir = p.join(configs_dir, 'users.d')
|
||||
os.mkdir(config_d_dir)
|
||||
|
||||
shutil.copy(p.join(HELPERS_DIR, 'common_instance_config.xml'), config_d_dir)
|
||||
|
||||
# Generate and write macroses file
|
||||
macroses = self.macroses.copy()
|
||||
macroses['instance'] = self.name
|
||||
with open(p.join(config_d_dir, 'macros.xml'), 'w') as macros_config:
|
||||
macros_config.write(MACROS_CONFIG_TEMPLATE.format(name=self.name))
|
||||
macros_config.write(self.dict_to_xml({"macros" : macroses}))
|
||||
|
||||
# Put ZooKeeper config
|
||||
if self.with_zookeeper:
|
||||
shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)
|
||||
|
||||
for path in self.custom_config_paths:
|
||||
# Copy config dir
|
||||
if self.custom_config_dir:
|
||||
distutils.dir_util.copy_tree(self.custom_config_dir, configs_dir)
|
||||
|
||||
# Copy config.d configs
|
||||
for path in self.custom_main_config_paths:
|
||||
shutil.copy(path, config_d_dir)
|
||||
|
||||
# Copy users.d configs
|
||||
for path in self.custom_user_config_paths:
|
||||
shutil.copy(path, users_d_dir)
|
||||
|
||||
db_dir = p.join(self.path, 'database')
|
||||
os.mkdir(db_dir)
|
||||
|
||||
|
@ -22,12 +22,20 @@ class PartitionManager:
|
||||
def __init__(self):
|
||||
self._iptables_rules = []
|
||||
|
||||
def isolate_instance_from_zk(self, instance, action='DROP'):
|
||||
|
||||
def drop_instance_zk_connections(self, instance, action='DROP'):
|
||||
self._check_instance(instance)
|
||||
|
||||
self._add_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action})
|
||||
self._add_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
|
||||
|
||||
def restore_instance_zk_connections(self, instance, action='DROP'):
|
||||
self._check_instance(instance)
|
||||
|
||||
self._delete_rule({'source': instance.ip_address, 'destination_port': 2181, 'action': action})
|
||||
self._delete_rule({'destination': instance.ip_address, 'source_port': 2181, 'action': action})
|
||||
|
||||
|
||||
def partition_instances(self, left, right, action='DROP'):
|
||||
self._check_instance(left)
|
||||
self._check_instance(right)
|
||||
@ -35,11 +43,13 @@ class PartitionManager:
|
||||
self._add_rule({'source': left.ip_address, 'destination': right.ip_address, 'action': action})
|
||||
self._add_rule({'source': right.ip_address, 'destination': left.ip_address, 'action': action})
|
||||
|
||||
|
||||
def heal_all(self):
|
||||
while self._iptables_rules:
|
||||
rule = self._iptables_rules.pop()
|
||||
_NetworkManager.get().delete_iptables_rule(**rule)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _check_instance(instance):
|
||||
if instance.ip_address is None:
|
||||
@ -49,6 +59,10 @@ class PartitionManager:
|
||||
_NetworkManager.get().add_iptables_rule(**rule)
|
||||
self._iptables_rules.append(rule)
|
||||
|
||||
def _delete_rule(self, rule):
|
||||
_NetworkManager.get().delete_iptables_rule(**rule)
|
||||
self._iptables_rules.remove(rule)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
@ -11,3 +11,7 @@ class TSV:
|
||||
|
||||
def diff(self, other):
|
||||
return list(line.rstrip() for line in difflib.context_diff(self.lines, other.lines))[2:]
|
||||
|
||||
@staticmethod
|
||||
def toMat(contents):
|
||||
return [line.split("\t") for line in contents.split("\n") if line.strip()]
|
||||
|
@ -12,6 +12,6 @@
|
||||
<host>zoo3</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<session_timeout_ms>1000</session_timeout_ms>
|
||||
<session_timeout_ms>3000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
||||
|
@ -0,0 +1,5 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<session_timeout_ms replace="1">2000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
@ -7,9 +7,9 @@ from helpers.network import PartitionManager
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
instance_with_dist_table = cluster.add_instance('instance_with_dist_table', ['configs/remote_servers.xml'])
|
||||
replica1 = cluster.add_instance('replica1', [], with_zookeeper=True)
|
||||
replica2 = cluster.add_instance('replica2', [], with_zookeeper=True)
|
||||
instance_with_dist_table = cluster.add_instance('instance_with_dist_table', main_configs=['configs/remote_servers.xml'])
|
||||
replica1 = cluster.add_instance('replica1', with_zookeeper=True)
|
||||
replica2 = cluster.add_instance('replica2', with_zookeeper=True)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
@ -53,9 +53,9 @@ SELECT count() FROM distributed SETTINGS
|
||||
max_replica_delay_for_distributed_queries=1
|
||||
''').strip() == '1'
|
||||
|
||||
pm.isolate_instance_from_zk(replica2)
|
||||
pm.drop_instance_zk_connections(replica2)
|
||||
|
||||
time.sleep(2) # allow pings to zookeeper to timeout
|
||||
time.sleep(2.5) # allow pings to zookeeper to timeout
|
||||
|
||||
# At this point all replicas are stale, but the query must still go to replica2 which is the least stale one.
|
||||
assert instance_with_dist_table.query('''
|
||||
@ -66,7 +66,7 @@ SELECT count() FROM distributed SETTINGS
|
||||
|
||||
# If we forbid stale replicas, the query must fail.
|
||||
with pytest.raises(Exception):
|
||||
instance_with_dist_table.query('''
|
||||
print instance_with_dist_table.query('''
|
||||
SELECT count() FROM distributed SETTINGS
|
||||
load_balancing='in_order',
|
||||
max_replica_delay_for_distributed_queries=1,
|
||||
|
@ -0,0 +1,28 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<cluster_without_replication>
|
||||
<shard>
|
||||
<internal_replication>false</internal_replication>
|
||||
<replica>
|
||||
<host>ch1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>ch2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
<shard>
|
||||
<internal_replication>false</internal_replication>
|
||||
<replica>
|
||||
<host>ch3</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>ch4</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</cluster_without_replication>
|
||||
</remote_servers>
|
||||
</yandex>
|
@ -0,0 +1,6 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<!-- Required for correct timing in current test case -->
|
||||
<session_timeout_ms replace="1">3000</session_timeout_ms>
|
||||
</zookeeper>
|
||||
</yandex>
|
@ -0,0 +1,9 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<!-- Default profile settings. -->
|
||||
<default>
|
||||
<log_queries>0</log_queries>
|
||||
<distributed_ddl_allow_replicated_alter>1</distributed_ddl_allow_replicated_alter>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
218
dbms/tests/integration/test_distributed_ddl/test.py
Normal file
218
dbms/tests/integration/test_distributed_ddl/test.py
Normal file
@ -0,0 +1,218 @@
|
||||
import os.path as p
|
||||
import time
|
||||
import datetime
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
|
||||
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
|
||||
if num_hosts is None:
|
||||
num_hosts = len(cluster.instances)
|
||||
|
||||
M = TSV.toMat(tsv_content)
|
||||
hosts = [l[0] for l in M]
|
||||
codes = [l[1] for l in M]
|
||||
messages = [l[2] for l in M]
|
||||
|
||||
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, tsv_content
|
||||
assert len(set(codes)) == 1, tsv_content
|
||||
assert codes[0] == "0", tsv_content
|
||||
|
||||
|
||||
def ddl_check_query(instance, query, num_hosts=None):
|
||||
contents = instance.query(query)
|
||||
check_all_hosts_sucesfully_executed(contents, num_hosts)
|
||||
return contents
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
TEST_REPLICATED_ALTERS=True
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
for i in xrange(4):
|
||||
macroses = {"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1}
|
||||
cluster.add_instance('ch{}'.format(i+1), config_dir="configs", macroses=macroses, with_zookeeper=True)
|
||||
|
||||
cluster.start()
|
||||
|
||||
# Initialize databases and service tables
|
||||
instance = cluster.instances['ch1']
|
||||
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
|
||||
(database String, name String, engine String, metadata_modification_time DateTime)
|
||||
ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
|
||||
""")
|
||||
|
||||
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
|
||||
|
||||
yield cluster
|
||||
|
||||
ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
|
||||
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
|
||||
|
||||
finally:
|
||||
pass
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_default_database(started_cluster):
|
||||
instance = cluster.instances['ch3']
|
||||
|
||||
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster'")
|
||||
ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")
|
||||
|
||||
contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
|
||||
assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
|
||||
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
|
||||
|
||||
|
||||
def test_on_server_fail(started_cluster):
|
||||
instance = cluster.instances['ch1']
|
||||
kill_instance = cluster.instances['ch2']
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
|
||||
|
||||
kill_instance.get_docker_handle().stop()
|
||||
request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", timeout=30)
|
||||
kill_instance.get_docker_handle().start()
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
|
||||
|
||||
# Check query itself
|
||||
check_all_hosts_sucesfully_executed(request.get_answer())
|
||||
|
||||
# And check query artefacts
|
||||
contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
|
||||
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
|
||||
|
||||
|
||||
def _test_on_connection_losses(cluster, zk_timeout):
|
||||
instance = cluster.instances['ch1']
|
||||
kill_instance = cluster.instances['ch2']
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.drop_instance_zk_connections(kill_instance)
|
||||
request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=10)
|
||||
time.sleep(zk_timeout)
|
||||
pm.restore_instance_zk_connections(kill_instance)
|
||||
|
||||
check_all_hosts_sucesfully_executed(request.get_answer())
|
||||
|
||||
|
||||
def test_on_connection_loss(started_cluster):
|
||||
_test_on_connection_losses(cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)
|
||||
|
||||
|
||||
def test_on_session_expired(started_cluster):
|
||||
_test_on_connection_losses(cluster, 4) # session should be expired (3 sec ZK timeout in config)
|
||||
|
||||
|
||||
def test_replicated_alters(started_cluster):
|
||||
instance = cluster.instances['ch2']
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
|
||||
|
||||
if not TEST_REPLICATED_ALTERS:
|
||||
return
|
||||
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
|
||||
""")
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
|
||||
ENGINE = Distributed(cluster, default, merge, i)
|
||||
""")
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
|
||||
ENGINE = Distributed(cluster, default, merge, i)
|
||||
""")
|
||||
|
||||
for i in xrange(4):
|
||||
k = (i / 2) * 2
|
||||
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
|
||||
|
||||
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
|
||||
|
||||
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster MODIFY COLUMN i Int64")
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")
|
||||
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
||||
|
||||
|
||||
for i in xrange(4):
|
||||
k = (i / 2) * 2 + 4
|
||||
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
|
||||
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
|
||||
|
||||
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster DETACH PARTITION 197002")
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster")
|
||||
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
|
||||
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
|
||||
|
||||
|
||||
def test_simple_alters(started_cluster):
|
||||
instance = cluster.instances['ch2']
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
|
||||
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")
|
||||
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster_without_replication (p Date, i Int32)
|
||||
ENGINE = MergeTree(p, p, 1)
|
||||
""")
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster_without_replication (p Date, i Int32)
|
||||
ENGINE = Distributed(cluster_without_replication, default, merge, i)
|
||||
""")
|
||||
ddl_check_query(instance, """
|
||||
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster_without_replication (p Date, i Int64, s String)
|
||||
ENGINE = Distributed(cluster_without_replication, default, merge, i)
|
||||
""")
|
||||
|
||||
for i in xrange(4):
|
||||
k = (i / 2) * 2
|
||||
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
|
||||
|
||||
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
|
||||
|
||||
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i)")
|
||||
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
||||
|
||||
|
||||
for i in xrange(4):
|
||||
k = (i / 2) * 2 + 4
|
||||
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
|
||||
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
|
||||
|
||||
|
||||
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
|
||||
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
|
||||
|
||||
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
|
||||
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
|
||||
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")
|
@ -8,7 +8,7 @@ from helpers.test_tools import TSV
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance('instance', ['configs/graphite_rollup.xml'])
|
||||
instance = cluster.add_instance('instance', main_configs=['configs/graphite_rollup.xml'])
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
|
@ -1,38 +0,0 @@
|
||||
FROM ubuntu:14.04
|
||||
|
||||
ARG repository="deb https://repo.yandex.ru/clickhouse/trusty/ dists/stable/main/binary-amd64/"
|
||||
ARG version=\*
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y apt-transport-https && \
|
||||
mkdir -p /etc/apt/sources.list.d && \
|
||||
echo $repository | tee /etc/apt/sources.list.d/clickhouse.list && \
|
||||
apt-get update && \
|
||||
apt-get install --allow-unauthenticated -y clickhouse-server-common=$version && \
|
||||
apt-get install --allow-unauthenticated -y clickhouse-client=$version && \
|
||||
rm -rf /var/lib/apt/lists/* /var/cache/debconf && \
|
||||
apt-get clean
|
||||
|
||||
RUN sed -i 's,<listen_host>127.0.0.1</listen_host>,<listen_host>::</listen_host>,' /etc/clickhouse-server/config.xml
|
||||
RUN chown -R clickhouse /etc/clickhouse-server/
|
||||
|
||||
EXPOSE 9000 8123 9009
|
||||
#VOLUME /var/lib/clickhouse
|
||||
|
||||
RUN mkdir -p /var/run/clickhouse-server && \
|
||||
chown -R clickhouse /var/run/clickhouse-server
|
||||
#RUN mkdir -p /etc/clickhouse-server/conf.d/ /etc/clickhouse-server/users.d/ && \
|
||||
# chown -R clickhouse /etc/clickhouse-server/conf.d /etc/clickhouse-server/users.d/
|
||||
RUN rm -rf /etc/clickhouse-server/conf.d /etc/clickhouse-server/users.d
|
||||
RUN rm -f /usr/bin/clickhouse
|
||||
|
||||
VOLUME /etc/clickhouse-server/conf.d/
|
||||
VOLUME /etc/clickhouse-server/users.d/
|
||||
VOLUME /usr/bin/clickhouse
|
||||
|
||||
ENV LAYER 1
|
||||
ENV SHARD 1
|
||||
ENV REPLICA 1
|
||||
|
||||
COPY docker-entrypoint.sh /
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
@ -1,23 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
echo "
|
||||
<yandex>
|
||||
<macros>
|
||||
<layer>${LAYER}</layer>
|
||||
<shard>${SHARD}</shard>
|
||||
<replica>${REPLICA}</replica>
|
||||
</macros>
|
||||
</yandex>" > /etc/metrika.xml
|
||||
|
||||
ls -l /etc/clickhouse-server/
|
||||
ls -l /etc/clickhouse-server/conf.d/
|
||||
ls -l /etc/clickhouse-server/users.d/
|
||||
|
||||
service clickhouse-server start
|
||||
|
||||
PID=`pidof clickhouse-server`
|
||||
while [ -e /proc/$PID ]; do
|
||||
sleep 2
|
||||
done
|
||||
echo "ClickHouse process $PID has finished"
|
@ -1,16 +0,0 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>zoo1</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<node index="2">
|
||||
<host>zoo2</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
<node index="3">
|
||||
<host>zoo3</host>
|
||||
<port>2181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
</yandex>
|
@ -1,88 +0,0 @@
|
||||
version: '2'
|
||||
services:
|
||||
# ClickHouses
|
||||
ch1:
|
||||
hostname: ch1
|
||||
environment:
|
||||
SHARD: 1
|
||||
REPLICA: 1
|
||||
ports:
|
||||
- "9001:9000"
|
||||
- "8124:8123"
|
||||
volumes:
|
||||
# Use system clickhouse binary
|
||||
# Use configs from the testcase dir
|
||||
- /usr/bin/clickhouse:/usr/bin/clickhouse:ro
|
||||
- ./conf.d:/etc/clickhouse-server/conf.d
|
||||
- ./users.d:/etc/clickhouse-server/users.d
|
||||
build: ../image
|
||||
|
||||
ch2:
|
||||
hostname: ch2
|
||||
environment:
|
||||
SHARD: 1
|
||||
REPLICA: 2
|
||||
ports:
|
||||
- "9002:9000"
|
||||
- "8125:8123"
|
||||
volumes:
|
||||
- /usr/bin/clickhouse:/usr/bin/clickhouse:ro
|
||||
- ./conf.d:/etc/clickhouse-server/conf.d
|
||||
- ./users.d:/etc/clickhouse-server/users.d
|
||||
build: ../image
|
||||
|
||||
ch3:
|
||||
hostname: ch3
|
||||
environment:
|
||||
SHARD: 2
|
||||
REPLICA: 1
|
||||
ports:
|
||||
- "9003:9000"
|
||||
- "8126:8123"
|
||||
volumes:
|
||||
- /usr/bin/clickhouse:/usr/bin/clickhouse:ro
|
||||
- ./conf.d:/etc/clickhouse-server/conf.d
|
||||
- ./users.d:/etc/clickhouse-server/users.d
|
||||
build: ../image
|
||||
|
||||
ch4:
|
||||
hostname: ch4
|
||||
environment:
|
||||
SHARD: 2
|
||||
REPLICA: 2
|
||||
ports:
|
||||
- "9004:9000"
|
||||
- "8127:8123"
|
||||
volumes:
|
||||
- /usr/bin/clickhouse:/usr/bin/clickhouse:ro
|
||||
- ./conf.d:/etc/clickhouse-server/conf.d
|
||||
- ./users.d:/etc/clickhouse-server/users.d
|
||||
build: ../image
|
||||
|
||||
# ZooKeepers
|
||||
zoo1:
|
||||
image: 31z4/zookeeper
|
||||
restart: always
|
||||
ports:
|
||||
- 2182:2181
|
||||
environment:
|
||||
ZOO_MY_ID: 1
|
||||
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
|
||||
|
||||
zoo2:
|
||||
image: 31z4/zookeeper
|
||||
restart: always
|
||||
ports:
|
||||
- 2183:2181
|
||||
environment:
|
||||
ZOO_MY_ID: 2
|
||||
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
|
||||
|
||||
zoo3:
|
||||
image: 31z4/zookeeper
|
||||
restart: always
|
||||
ports:
|
||||
- 2184:2181
|
||||
environment:
|
||||
ZOO_MY_ID: 3
|
||||
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
|
@ -1,3 +0,0 @@
|
||||
#!/bin/bash
|
||||
docker-compose kill && docker-compose rm -fv && docker-compose up -d
|
||||
|
@ -1,16 +0,0 @@
|
||||
4
|
||||
|
||||
ch1 default
|
||||
ch2 test2
|
||||
ch3 default
|
||||
ch4 test2
|
||||
|
||||
ch1:9000
|
||||
ch2:9000
|
||||
ch3:9000
|
||||
ch4:9000
|
||||
|
||||
ch1:9000
|
||||
ch2:9000
|
||||
ch3:9000
|
||||
ch4:9000
|
@ -1,73 +0,0 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
[[ `id -u -n` -ne root ]] && su
|
||||
dce='docker-compose exec -T'
|
||||
|
||||
$dce ch1 clickhouse-client -q "CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
|
||||
(database String, name String, engine String, metadata_modification_time DateTime)
|
||||
ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')" | cut -f 1-2 | sort
|
||||
|
||||
echo "###"
|
||||
|
||||
# Test default_database
|
||||
$dce ch1 clickhouse-client -q "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'" | cut -f 1-2 | sort
|
||||
$dce ch1 clickhouse-client -q "CREATE TABLE null ON CLUSTER 'cluster2' (i Int8) ENGINE = Null" | cut -f 1-2 | sort
|
||||
$dce ch1 clickhouse-client -q "SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h"
|
||||
$dce ch1 clickhouse-client -q "DROP TABLE IF EXISTS null ON CLUSTER cluster2" | cut -f 1-2 | sort
|
||||
|
||||
echo "###"
|
||||
|
||||
# Replicated alter
|
||||
$dce ch1 clickhouse-client -q "DROP TABLE IF EXISTS merge ON CLUSTER cluster" 1>/dev/null
|
||||
$dce ch1 clickhouse-client -q "CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster (p Date, i Int32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)" | cut -f 1-3 | sort
|
||||
$dce ch1 clickhouse-client -q "CREATE TABLE IF NOT EXISTS all_merge_1 ON CLUSTER cluster (p Date, i Int32)
|
||||
ENGINE = Distributed(cluster, default, merge, i)" | cut -f 1-2 | uniq | wc -l
|
||||
$dce ch1 clickhouse-client -q "CREATE TABLE IF NOT EXISTS all_merge_2 ON CLUSTER cluster (p Date, i Int64, s String)
|
||||
ENGINE = Distributed(cluster, default, merge, i)" | cut -f 1-2 | uniq | wc -l
|
||||
|
||||
$dce ch1 clickhouse-client -q "INSERT INTO all_merge_1 (i) VALUES (1) (2) (3) (4)"
|
||||
$dce ch1 clickhouse-client -q "SELECT i FROM all_merge_1 ORDER BY i"
|
||||
$dce ch1 clickhouse-client -q "ALTER TABLE merge ON CLUSTER cluster MODIFY COLUMN i Int64" | cut -f 1-2 | sort
|
||||
$dce ch1 clickhouse-client -q "ALTER TABLE merge ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)" | cut -f 1-2 | sort
|
||||
$dce ch1 clickhouse-client -q "SELECT i, s FROM all_merge_2 ORDER BY i"
|
||||
|
||||
echo "###"
|
||||
|
||||
# detach partition
|
||||
$dce ch1 clickhouse-client -q "INSERT INTO all_merge_2 (p, i) VALUES (31, 5) (31, 6) (31, 7) (31, 8)"
|
||||
$dce ch1 clickhouse-client -q "SELECT i FROM all_merge_2 ORDER BY i"
|
||||
$dce ch1 clickhouse-client -q "ALTER TABLE merge ON CLUSTER cluster DETACH PARTITION 197002" | cut -f 1-2 | sort
|
||||
$dce ch1 clickhouse-client -q "SELECT i FROM all_merge_2 ORDER BY i"
|
||||
|
||||
$dce ch1 clickhouse-client -q "DROP TABLE all_merge_1 ON CLUSTER cluster" | cut -f 1-3 | sort
|
||||
$dce ch1 clickhouse-client -q "DROP TABLE all_merge_2 ON CLUSTER cluster" | cut -f 1-3 | sort
|
||||
$dce ch1 clickhouse-client -q "DROP TABLE merge ON CLUSTER cluster" | cut -f 1-3 | sort
|
||||
|
||||
echo "###"
|
||||
|
||||
# Server fail
|
||||
docker-compose kill ch2 1>/dev/null 2>/dev/null
|
||||
($dce ch1 clickhouse-client -q "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'" | cut -f 1-2 | sort) &
|
||||
sleep 1
|
||||
docker-compose start ch2 1>/dev/null 2>/dev/null
|
||||
wait
|
||||
|
||||
echo "###"
|
||||
|
||||
# Connection loss
|
||||
docker-compose pause ch2 zoo1 1>/dev/null 2>/dev/null
|
||||
($dce ch1 clickhouse-client -q "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'" | cut -f 1-2 | sort) &
|
||||
sleep 10
|
||||
docker-compose unpause ch2 zoo1 1>/dev/null 2>/dev/null
|
||||
wait
|
||||
|
||||
echo "###"
|
||||
|
||||
# Session expired
|
||||
docker-compose pause ch2 zoo1 1>/dev/null 2>/dev/null
|
||||
($dce ch1 clickhouse-client -q "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster'" | cut -f 1-2 | sort) &
|
||||
sleep 31
|
||||
docker-compose unpause ch2 zoo1 1>/dev/null 2>/dev/null
|
||||
wait
|
@ -1,8 +0,0 @@
|
||||
<yandex>
|
||||
<profiles>
|
||||
<!-- Default settings. -->
|
||||
<default replace="1">
|
||||
<log_queries>1</log_queries>
|
||||
</default>
|
||||
</profiles>
|
||||
</yandex>
|
Loading…
Reference in New Issue
Block a user