Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-08-30 21:53:37 +03:00
commit 9fdbd576f6
10 changed files with 250 additions and 18 deletions

View File

@ -71,11 +71,12 @@ void ZooKeeper::processCallback(zhandle_t * zh, int type, int state, const char
destroyContext(context);
}
void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_)
{
log = &Logger::get("ZooKeeper");
zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR);
hosts = hosts_;
identity = identity_;
session_timeout_ms = session_timeout_ms_;
impl = zookeeper_init(hosts.c_str(), nullptr, session_timeout_ms, nullptr, nullptr, 0);
@ -84,12 +85,23 @@ void ZooKeeper::init(const std::string & hosts_, int32_t session_timeout_ms_)
if (!impl)
throw KeeperException("Fail to initialize zookeeper. Hosts are " + hosts);
default_acl = &ZOO_OPEN_ACL_UNSAFE;
if (!identity.empty())
{
auto code = zoo_add_auth(impl, "digest", identity.c_str(), static_cast<int>(identity.size()), 0, 0);
if (code != ZOK)
throw KeeperException("Zookeeper authentication failed. Hosts are " + hosts, code);
default_acl = &ZOO_CREATOR_ALL_ACL;
}
else
default_acl = &ZOO_OPEN_ACL_UNSAFE;
LOG_TRACE(log, "initialized, hosts: " << hosts);
}
ZooKeeper::ZooKeeper(const std::string & hosts, int32_t session_timeout_ms)
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms)
{
init(hosts, session_timeout_ms);
init(hosts, identity, session_timeout_ms);
}
struct ZooKeeperArgs
@ -100,6 +112,7 @@ struct ZooKeeperArgs
config.keys(config_name, keys);
std::vector<std::string> hosts_strings;
std::string root;
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
for (const auto & key : keys)
@ -107,12 +120,22 @@ struct ZooKeeperArgs
if (startsWith(key, "node"))
{
hosts_strings.push_back(
config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
);
}
else if (key == "session_timeout_ms")
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "identity")
{
identity = config.getString(config_name + "." + key);
}
else if (key == "root")
{
root = config.getString(config_name + "." + key);
}
else throw KeeperException(std::string("Unknown key ") + key + " in config file");
}
@ -127,16 +150,24 @@ struct ZooKeeperArgs
hosts += ",";
hosts += host;
}
if (!root.empty())
{
if (root.front() != '/')
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + root);
hosts += root;
}
}
std::string hosts;
size_t session_timeout_ms;
std::string identity;
int session_timeout_ms;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.session_timeout_ms);
init(args.hosts, args.identity, args.session_timeout_ms);
}
WatchCallback ZooKeeper::callbackForEvent(const EventPtr & event)
@ -710,7 +741,7 @@ ZooKeeper::~ZooKeeper()
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(hosts, session_timeout_ms);
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)

View File

@ -54,7 +54,7 @@ class ZooKeeper
public:
using Ptr = std::shared_ptr<ZooKeeper>;
ZooKeeper(const std::string & hosts, int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT);
ZooKeeper(const std::string & hosts, const std::string & identity = "", int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT);
/** Config of the form:
<zookeeper>
@ -67,6 +67,10 @@ public:
<port>2181</port>
</node>
<session_timeout_ms>30000</session_timeout_ms>
<!-- Optional. Chroot suffix. Should exist. -->
<root>/path/to/zookeeper/node</root>
<!-- Optional. Zookeeper digest ACL string. -->
<identity>user:password</identity>
</zookeeper>
*/
ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
@ -353,7 +357,7 @@ private:
friend struct WatchContext;
friend class EphemeralNodeHolder;
void init(const std::string & hosts, int32_t session_timeout_ms);
void init(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
@ -397,6 +401,7 @@ private:
MultiFuture asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception);
std::string hosts;
std::string identity;
int32_t session_timeout_ms;
std::mutex mutex;

View File

@ -11,7 +11,7 @@ int main()
{
try
{
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", 5000);
ZooKeeper zk("mtfilter01t:2181,metrika-test:2181,mtweb01t:2181", "", 5000);
Strings children;
std::cout << "create path" << std::endl;

View File

@ -12,6 +12,7 @@ from dicttoxml import dicttoxml
import xml.dom.minidom
import docker
from docker.errors import ContainerError
from .client import Client, CommandRequest
@ -28,13 +29,15 @@ class ClickHouseCluster:
these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
"""
def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None):
def __init__(self, base_path, name=None, base_configs_dir=None, server_bin_path=None, client_bin_path=None,
zookeeper_config_path=None):
self.base_dir = p.dirname(base_path)
self.name = name if name is not None else ''
self.base_configs_dir = base_configs_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR', '/etc/clickhouse-server/')
self.server_bin_path = server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse')
self.client_bin_path = client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client')
self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(HELPERS_DIR, 'zookeeper_config.xml')
self.project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
# docker-compose removes everything non-alphanumeric from project names so we do it too.
@ -42,6 +45,8 @@ class ClickHouseCluster:
self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name))
self.base_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name]
self.base_zookeeper_cmd = None
self.pre_zookkeeper_commands = []
self.instances = {}
self.with_zookeeper = False
@ -68,13 +73,15 @@ class ClickHouseCluster:
instance = ClickHouseInstance(
self, self.base_dir, name, config_dir, main_configs, user_configs, macroses, with_zookeeper,
self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.zookeeper_config_path, self.base_configs_dir, self.server_bin_path, clickhouse_path_dir, hostname=hostname)
self.instances[name] = instance
self.base_cmd.extend(['--file', instance.docker_compose_path])
if with_zookeeper and not self.with_zookeeper:
self.with_zookeeper = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
return instance
@ -102,10 +109,15 @@ class ClickHouseCluster:
for instance in self.instances.values():
instance.create_dir(destroy_dir=destroy_dirs)
subprocess.check_call(self.base_cmd + ['up', '-d'])
self.docker_client = docker.from_env()
if self.with_zookeeper and self.base_zookeeper_cmd:
subprocess.check_call(self.base_zookeeper_cmd + ['up', '-d', '--no-recreate'])
for command in self.pre_zookkeeper_commands:
self.run_zookeeper_client_command(command, repeats=5)
subprocess.check_call(self.base_cmd + ['up', '-d', '--no-recreate'])
start_deadline = time.time() + 20.0 # seconds
for instance in self.instances.itervalues():
instance.docker_client = self.docker_client
@ -123,7 +135,7 @@ class ClickHouseCluster:
def shutdown(self, kill=True):
if kill:
subprocess.check_call(self.base_cmd + ['kill'])
subprocess.check_call(self.base_cmd + ['down', '--volumes'])
subprocess.check_call(self.base_cmd + ['down', '--volumes', '--remove-orphans'])
self.is_up = False
self.docker_client = None
@ -134,6 +146,22 @@ class ClickHouseCluster:
instance.client = None
def run_zookeeper_client_command(self, command, zoo_node = 'zoo1', repeats=1, sleep_for=1):
cli_cmd = 'zkCli.sh ' + command
zoo_name = self.get_instance_docker_id(zoo_node)
network_mode = 'container:' + zoo_name
for i in range(0, repeats - 1):
try:
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
except ContainerError:
time.sleep(sleep_for)
return self.docker_client.containers.run('zookeeper', cli_cmd, remove=True, network_mode=network_mode)
def add_zookeeper_startup_command(self, command):
self.pre_zookkeeper_commands.append(command)
DOCKER_COMPOSE_TEMPLATE = '''
version: '2'
services:
@ -157,7 +185,7 @@ services:
class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, custom_config_dir, custom_main_configs, custom_user_configs, macroses,
with_zookeeper, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
with_zookeeper, zookeeper_config_path, base_configs_dir, server_bin_path, clickhouse_path_dir, hostname=None):
self.name = name
self.base_cmd = cluster.base_cmd[:]
@ -171,6 +199,7 @@ class ClickHouseInstance:
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
self.macroses = macroses if macroses is not None else {}
self.with_zookeeper = with_zookeeper
self.zookeeper_config_path = zookeeper_config_path
self.base_configs_dir = base_configs_dir
self.server_bin_path = server_bin_path
@ -287,7 +316,7 @@ class ClickHouseInstance:
# Put ZooKeeper config
if self.with_zookeeper:
shutil.copy(p.join(HELPERS_DIR, 'zookeeper_config.xml'), config_d_dir)
shutil.copy(self.zookeeper_config_path, config_d_dir)
# Copy config dir
if self.custom_config_dir:

View File

@ -0,0 +1,17 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<root>/root_a</root>
</zookeeper>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<root>/root_b</root>
</zookeeper>
</yandex>

View File

@ -0,0 +1,18 @@
<yandex>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
<identity>user:password</identity>
</zookeeper>
</yandex>

View File

@ -0,0 +1,96 @@
from helpers.cluster import ClickHouseCluster
import pytest
def test_chroot_with_same_root():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
cluster_2 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
nodes = [node1, node2]
cluster_1.add_zookeeper_startup_command('create /root_a ""')
cluster_1.add_zookeeper_startup_command('ls / ')
try:
cluster_1.start()
try:
cluster_2.start(destroy_dirs=False)
for i, node in enumerate(nodes):
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
node.query("INSERT INTO simple VALUES ({0}, {0})".format(i))
assert node1.query('select count() from simple').strip() == '2'
assert node2.query('select count() from simple').strip() == '2'
finally:
cluster_2.shutdown()
finally:
cluster_1.shutdown()
def test_chroot_with_different_root():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_a.xml')
cluster_2 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_root_b.xml')
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
nodes = [node1, node2]
cluster_1.add_zookeeper_startup_command('create /root_a ""')
cluster_1.add_zookeeper_startup_command('create /root_b ""')
cluster_1.add_zookeeper_startup_command('ls / ')
try:
cluster_1.start()
try:
cluster_2.start(destroy_dirs=False)
for i, node in enumerate(nodes):
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
'''.format(replica=node.name))
node.query("INSERT INTO simple VALUES ({0}, {0})".format(i))
assert node1.query('select count() from simple').strip() == '1'
assert node2.query('select count() from simple').strip() == '1'
finally:
cluster_2.shutdown()
finally:
cluster_1.shutdown()
def test_identity():
cluster_1 = ClickHouseCluster(__file__, zookeeper_config_path='configs/zookeeper_config_with_password.xml')
cluster_2 = ClickHouseCluster(__file__)
node1 = cluster_1.add_instance('node1', config_dir='configs', with_zookeeper=True)
node2 = cluster_2.add_instance('node2', config_dir='configs', with_zookeeper=True)
try:
cluster_1.start()
# node1.query('''
# CREATE TABLE simple (date Date, id UInt32)
# ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
# '''.format(replica=node1.name))
with pytest.raises(Exception):
cluster_2.start(destroy_dirs=False)
finally:
cluster_1.shutdown()
cluster_2.shutdown()