Merge branch 'master' into zookeeper_snapshots

This commit is contained in:
alesapin 2021-06-22 13:50:27 +03:00
commit 6b815b4f17
28 changed files with 307 additions and 150 deletions

View File

@ -8,13 +8,6 @@
extern "C" {
#endif
#include <pthread.h>
size_t __pthread_get_minstack(const pthread_attr_t * attr)
{
return 1048576; /// This is a guess. Don't sure it is correct.
}
#include <signal.h>
#include <unistd.h>
#include <string.h>
@ -141,6 +134,8 @@ int __open_2(const char *path, int oflag)
}
#include <pthread.h>
/// No-ops.
int pthread_setname_np(pthread_t thread, const char *name) { return 0; }
int pthread_getname_np(pthread_t thread, char *name, size_t len) { name[0] = '\0'; return 0; };

View File

@ -2,7 +2,7 @@
#include <errmsg.h>
#include <mysql.h>
#else
#include <mysql/errmsg.h>
#include <mysql/errmsg.h> //Y_IGNORE
#include <mysql/mysql.h>
#endif

39
base/mysqlxx/ya.make Normal file
View File

@ -0,0 +1,39 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
LIBRARY()
OWNER(g:clickhouse)
CFLAGS(-g0)
PEERDIR(
contrib/restricted/boost/libs
contrib/libs/libmysql_r
contrib/libs/poco/Foundation
contrib/libs/poco/Util
)
ADDINCL(
GLOBAL clickhouse/base
clickhouse/base
contrib/libs/libmysql_r
)
NO_COMPILER_WARNINGS()
NO_UTIL()
SRCS(
Connection.cpp
Exception.cpp
Pool.cpp
PoolFactory.cpp
PoolWithFailover.cpp
Query.cpp
ResultBase.cpp
Row.cpp
UseQueryResult.cpp
Value.cpp
)
END()

28
base/mysqlxx/ya.make.in Normal file
View File

@ -0,0 +1,28 @@
LIBRARY()
OWNER(g:clickhouse)
CFLAGS(-g0)
PEERDIR(
contrib/restricted/boost/libs
contrib/libs/libmysql_r
contrib/libs/poco/Foundation
contrib/libs/poco/Util
)
ADDINCL(
GLOBAL clickhouse/base
clickhouse/base
contrib/libs/libmysql_r
)
NO_COMPILER_WARNINGS()
NO_UTIL()
SRCS(
<? find . -name '*.cpp' | grep -v -F tests/ | grep -v -F examples | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -4,6 +4,7 @@ RECURSE(
common
daemon
loggers
mysqlxx
pcg-random
widechar_width
readpassphrase

View File

@ -132,7 +132,7 @@ void ODBCBlockInputStream::insertValue(
auto value = row.get<std::string>(idx);
ReadBufferFromString in(value);
time_t time = 0;
readDateTimeText(time, in);
readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(data_type.get())->getTimeZone());
if (time < 0)
time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time);

View File

@ -39,7 +39,7 @@ public:
void setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update = false);
/// How much seconds passed since query execution start.
UInt64 elapsedSeconds() const { return watch.elapsedSeconds(); }
double elapsedSeconds() const { return watch.elapsedSeconds(); }
private:
/// This flag controls whether to show the progress bar. We start showing it after

View File

@ -7,8 +7,9 @@
#include <Poco/Logger.h>
#include <common/getThreadId.h>
#include <common/getPageSize.h>
#include <signal.h>
#include <csignal>
namespace DB
@ -25,8 +26,48 @@ thread_local ThreadStatus * current_thread = nullptr;
thread_local ThreadStatus * main_thread = nullptr;
#if !defined(SANITIZER) && !defined(ARCADIA_BUILD)
alignas(4096) static thread_local char alt_stack[std::max<size_t>(MINSIGSTKSZ, 4096)];
static thread_local bool has_alt_stack = false;
namespace
{
/// Alternative stack for signal handling.
///
/// This stack should not be located in the TLS (thread local storage), since:
/// - TLS locates data on the per-thread stack
/// - And in case of stack in the signal handler will grow too much,
/// it will start overwriting TLS storage
/// (and note, that it is not too small, due to StackTrace obtaining)
/// - Plus there is no way to determine TLS block size, yes there is
/// __pthread_get_minstack() in glibc, but it is private and hence not portable.
///
/// Also we should not use getStackSize() (pthread_attr_getstack()) since it
/// will return 8MB, and this is too huge for signal stack.
struct ThreadStack
{
ThreadStack()
: data(aligned_alloc(getPageSize(), size))
{
/// Add a guard page
/// (and since the stack grows downward, we need to protect the first page).
mprotect(data, getPageSize(), PROT_NONE);
}
~ThreadStack()
{
mprotect(data, getPageSize(), PROT_WRITE|PROT_READ);
free(data);
}
static size_t getSize() { return size; }
void * getData() const { return data; }
private:
static constexpr size_t size = 16 << 10; /// 16 KiB - not too big but enough to handle error.
void * data;
};
}
static thread_local ThreadStack alt_stack;
static thread_local bool has_alt_stack = false;
#endif
@ -54,9 +95,9 @@ ThreadStatus::ThreadStatus()
/// We have to call 'sigaltstack' before first 'sigaction'. (It does not work other way, for unknown reason).
stack_t altstack_description{};
altstack_description.ss_sp = alt_stack;
altstack_description.ss_sp = alt_stack.getData();
altstack_description.ss_flags = 0;
altstack_description.ss_size = sizeof(alt_stack);
altstack_description.ss_size = alt_stack.getSize();
if (0 != sigaltstack(&altstack_description, nullptr))
{

View File

@ -22,6 +22,52 @@ namespace DB
static thread_local void * stack_address = nullptr;
static thread_local size_t max_stack_size = 0;
/**
* @param out_address - if not nullptr, here the address of the stack will be written.
* @return stack size
*/
size_t getStackSize(void ** out_address)
{
using namespace DB;
size_t size;
void * address;
#if defined(OS_DARWIN)
// pthread_get_stacksize_np() returns a value too low for the main thread on
// OSX 10.9, http://mail.openjdk.java.net/pipermail/hotspot-dev/2013-October/011369.html
//
// Multiple workarounds possible, adopt the one made by https://github.com/robovm/robovm/issues/274
// https://developer.apple.com/library/mac/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html
// Stack size for the main thread is 8MB on OSX excluding the guard page size.
pthread_t thread = pthread_self();
size = pthread_main_np() ? (8 * 1024 * 1024) : pthread_get_stacksize_np(thread);
// stack address points to the start of the stack, not the end how it's returned by pthread_get_stackaddr_np
address = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(pthread_get_stackaddr_np(thread)) - max_stack_size);
#else
pthread_attr_t attr;
# if defined(__FreeBSD__) || defined(OS_SUNOS)
pthread_attr_init(&attr);
if (0 != pthread_attr_get_np(pthread_self(), &attr))
throwFromErrno("Cannot pthread_attr_get_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
# else
if (0 != pthread_getattr_np(pthread_self(), &attr))
throwFromErrno("Cannot pthread_getattr_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
# endif
SCOPE_EXIT({ pthread_attr_destroy(&attr); });
if (0 != pthread_attr_getstack(&attr, &address, &size))
throwFromErrno("Cannot pthread_getattr_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
#endif // OS_DARWIN
if (out_address)
*out_address = address;
return size;
}
/** It works fine when interpreters are instantiated by ClickHouse code in properly prepared threads,
* but there are cases when ClickHouse runs as a library inside another application.
* If application is using user-space lightweight threads with manually allocated stacks,
@ -34,36 +80,7 @@ __attribute__((__weak__)) void checkStackSize()
using namespace DB;
if (!stack_address)
{
#if defined(OS_DARWIN)
// pthread_get_stacksize_np() returns a value too low for the main thread on
// OSX 10.9, http://mail.openjdk.java.net/pipermail/hotspot-dev/2013-October/011369.html
//
// Multiple workarounds possible, adopt the one made by https://github.com/robovm/robovm/issues/274
// https://developer.apple.com/library/mac/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html
// Stack size for the main thread is 8MB on OSX excluding the guard page size.
pthread_t thread = pthread_self();
max_stack_size = pthread_main_np() ? (8 * 1024 * 1024) : pthread_get_stacksize_np(thread);
// stack_address points to the start of the stack, not the end how it's returned by pthread_get_stackaddr_np
stack_address = reinterpret_cast<void*>(reinterpret_cast<uintptr_t>(pthread_get_stackaddr_np(thread)) - max_stack_size);
#else
pthread_attr_t attr;
# if defined(__FreeBSD__) || defined(OS_SUNOS)
pthread_attr_init(&attr);
if (0 != pthread_attr_get_np(pthread_self(), &attr))
throwFromErrno("Cannot pthread_attr_get_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
# else
if (0 != pthread_getattr_np(pthread_self(), &attr))
throwFromErrno("Cannot pthread_getattr_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
# endif
SCOPE_EXIT({ pthread_attr_destroy(&attr); });
if (0 != pthread_attr_getstack(&attr, &stack_address, &max_stack_size))
throwFromErrno("Cannot pthread_getattr_np", ErrorCodes::CANNOT_PTHREAD_ATTR);
#endif // OS_DARWIN
}
max_stack_size = getStackSize(&stack_address);
const void * frame_address = __builtin_frame_address(0);
uintptr_t int_frame_address = reinterpret_cast<uintptr_t>(frame_address);

View File

@ -170,7 +170,7 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
{
ReadBufferFromString in(value);
time_t time = 0;
readDateTimeText(time, in);
readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(data_type.get())->getTimeZone());
if (time < 0)
time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time);
@ -272,11 +272,11 @@ void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataT
else if (which.isDate())
parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };
else if (which.isDateTime())
parser = [](std::string & field) -> Field
parser = [nested](std::string & field) -> Field
{
ReadBufferFromString in(field);
time_t time = 0;
readDateTimeText(time, in);
readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(nested.get())->getTimeZone());
return time;
};
else if (which.isDecimal32())

View File

@ -169,7 +169,7 @@ namespace
{
ReadBufferFromString in(value);
time_t time = 0;
readDateTimeText(time, in);
readDateTimeText(time, in, assert_cast<const DataTypeDateTime &>(data_type).getTimeZone());
if (time < 0)
time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time);

View File

@ -1,6 +1,7 @@
import os
import subprocess as sp
import tempfile
import logging
from threading import Timer
@ -105,6 +106,7 @@ class CommandRequest:
stderr = self.stderr_file.read().decode('utf-8', errors='replace')
if self.timer is not None and not self.process_finished_before_timeout and not self.ignore_error:
logging.debug(f"Timed out. Last stdout:{stdout}, stderr:{stderr}")
raise QueryTimeoutExceedException('Client timed out!')
if (self.process.returncode != 0 or stderr) and not self.ignore_error:

View File

@ -598,7 +598,7 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')])
self.base_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]
print("HDFS BASE CMD:{}".format(self.base_hdfs_cmd))
logging.debug("HDFS BASE CMD:{self.base_hdfs_cmd)}")
return self.base_hdfs_cmd
def setup_kerberized_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
@ -1489,9 +1489,9 @@ class ClickHouseCluster:
instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name)
logging.debug("Waiting for ClickHouse start...")
logging.debug("Waiting for ClickHouse start in {instance}, ip: {instance.ip_address}...")
instance.wait_for_start(start_timeout)
logging.debug("ClickHouse started")
logging.debug("ClickHouse {instance} started")
instance.client = Client(instance.ip_address, command=self.client_bin_path)
@ -1885,8 +1885,7 @@ class ClickHouseInstance:
self.start_clickhouse(stop_start_wait_sec)
def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs):
container_id = self.get_docker_handle().id
return self.cluster.exec_in_container(container_id, cmd, detach, nothrow, **kwargs)
return self.cluster.exec_in_container(self.docker_id, cmd, detach, nothrow, **kwargs)
def contains_in_log(self, substring):
result = self.exec_in_container(
@ -1926,8 +1925,7 @@ class ClickHouseInstance:
["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n'
def copy_file_to_container(self, local_path, dest_path):
container_id = self.get_docker_handle().id
return self.cluster.copy_file_to_container(container_id, local_path, dest_path)
return self.cluster.copy_file_to_container(self.docker_id, local_path, dest_path)
def get_process_pid(self, process_name):
output = self.exec_in_container(["bash", "-c",
@ -1982,6 +1980,7 @@ class ClickHouseInstance:
self.get_docker_handle().start()
def wait_for_start(self, start_timeout=None, connection_timeout=None):
handle = self.get_docker_handle()
if start_timeout is None or start_timeout <= 0:
raise Exception("Invalid timeout: {}".format(start_timeout))
@ -2004,11 +2003,10 @@ class ClickHouseInstance:
return False
while True:
handle = self.get_docker_handle()
handle.reload()
status = handle.status
if status == 'exited':
raise Exception("Instance `{}' failed to start. Container status: {}, logs: {}"
.format(self.name, status, handle.logs().decode('utf-8')))
raise Exception(f"Instance `{self.name}' failed to start. Container status: {status}, logs: {handle.logs().decode('utf-8')}")
deadline = start_time + start_timeout
# It is possible that server starts slowly.
@ -2018,9 +2016,8 @@ class ClickHouseInstance:
current_time = time.time()
if current_time >= deadline:
raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. "
"Container status: {}, logs: {}".format(self.name, self.ip_address, status,
handle.logs().decode('utf-8')))
raise Exception(f"Timed out while waiting for instance `{self.name}' with ip address {self.ip_address} to start. " \
f"Container status: {status}, logs: {handle.logs().decode('utf-8')}")
socket_timeout = min(start_timeout, deadline - current_time)

View File

@ -1,5 +1,6 @@
import difflib
import time
import logging
from io import IOBase
@ -56,7 +57,7 @@ def assert_eq_with_retry(instance, query, expectation, retry_count=20, sleep_tim
break
time.sleep(sleep_time)
except Exception as ex:
print(("assert_eq_with_retry retry {} exception {}".format(i + 1, ex)))
logging.exception(f"assert_eq_with_retry retry {i+1} exception {ex}")
time.sleep(sleep_time)
else:
val = TSV(get_result(instance.query(query, user=user, stdin=stdin, timeout=timeout, settings=settings,
@ -76,7 +77,7 @@ def assert_logs_contain_with_retry(instance, substring, retry_count=20, sleep_ti
break
time.sleep(sleep_time)
except Exception as ex:
print("contains_in_log_with_retry retry {} exception {}".format(i + 1, ex))
logging.exception(f"contains_in_log_with_retry retry {i+1} exception {ex}")
time.sleep(sleep_time)
else:
raise AssertionError("'{}' not found in logs".format(substring))
@ -89,7 +90,7 @@ def exec_query_with_retry(instance, query, retry_count=40, sleep_time=0.5, setti
break
except Exception as ex:
exception = ex
print("Failed to execute query '", query, "' on instance", instance.name, "will retry")
logging.exception(f"Failed to execute query '{query}' on instance '{instance.name}' will retry")
time.sleep(sleep_time)
else:
raise exception

View File

@ -43,8 +43,8 @@ def test_backup_from_old_version(started_cluster):
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
node1.exec_in_container(['bash', '-c',
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
node1.exec_in_container(['find', '/var/lib/clickhouse/shadow/1/data/default/source_table'])
node1.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -81,8 +81,7 @@ def test_backup_from_old_version_setting(started_cluster):
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
node2.exec_in_container(['bash', '-c',
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
node2.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -123,8 +122,7 @@ def test_backup_from_old_version_config(started_cluster):
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
node3.exec_in_container(['bash', '-c',
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
node3.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -156,8 +154,7 @@ def test_backup_and_alter(started_cluster):
node4.query("ALTER TABLE test.backup_table DROP PARTITION tuple()")
node4.exec_in_container(['bash', '-c',
'cp -r /var/lib/clickhouse/shadow/1/data/test/backup_table/all_1_1_0/ /var/lib/clickhouse/data/test/backup_table/detached'])
node4.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/test/backup_table/all_1_1_0/', '/var/lib/clickhouse/data/test/backup_table/detached'])
node4.query("ALTER TABLE test.backup_table ATTACH PARTITION tuple()")

View File

@ -39,7 +39,7 @@ class Task:
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_taxi_data.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
logging.debug(f"Copied task file to container of '{instance_name}' instance. Path {self.container_task_file}")
def start(self):
@ -48,11 +48,11 @@ class Task:
node.query("DROP DATABASE IF EXISTS dailyhistory SYNC;")
node.query("DROP DATABASE IF EXISTS monthlyhistory SYNC;")
instance = cluster.instances['first']
first = cluster.instances['first']
# daily partition database
instance.query("CREATE DATABASE IF NOT EXISTS dailyhistory on cluster events;")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata_staging ON CLUSTER events
first.query("CREATE DATABASE IF NOT EXISTS dailyhistory on cluster events;")
first.query("""CREATE TABLE dailyhistory.yellow_tripdata_staging ON CLUSTER events
(
id UUID DEFAULT generateUUIDv4(),
vendor_id String,
@ -84,12 +84,12 @@ class Task:
ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (toYYYYMMDD(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata
first.query("""CREATE TABLE dailyhistory.yellow_tripdata
ON CLUSTER events
AS dailyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
instance.query("""INSERT INTO dailyhistory.yellow_tripdata
first.query("""INSERT INTO dailyhistory.yellow_tripdata
SELECT * FROM generateRandom(
'id UUID DEFAULT generateUUIDv4(),
vendor_id String,
@ -119,8 +119,8 @@ class Task:
1, 10, 2) LIMIT 50;""")
# monthly partition database
instance.query("create database IF NOT EXISTS monthlyhistory on cluster events;")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata_staging ON CLUSTER events
first.query("create database IF NOT EXISTS monthlyhistory on cluster events;")
first.query("""CREATE TABLE monthlyhistory.yellow_tripdata_staging ON CLUSTER events
(
id UUID DEFAULT generateUUIDv4(),
vendor_id String,
@ -153,16 +153,16 @@ class Task:
ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata
first.query("""CREATE TABLE monthlyhistory.yellow_tripdata
ON CLUSTER events
AS monthlyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
def check(self):
instance = cluster.instances["first"]
a = TSV(instance.query("SELECT count() from dailyhistory.yellow_tripdata"))
b = TSV(instance.query("SELECT count() from monthlyhistory.yellow_tripdata"))
first = cluster.instances["first"]
a = TSV(first.query("SELECT count() from dailyhistory.yellow_tripdata"))
b = TSV(first.query("SELECT count() from monthlyhistory.yellow_tripdata"))
assert a == b, "Distributed tables"
for instance_name, instance in cluster.instances.items():
@ -187,10 +187,10 @@ def execute_task(started_cluster, task, cmd_options):
task.start()
zk = started_cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
logging.debug("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
docker_api = started_cluster.docker_client.api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
@ -201,9 +201,9 @@ def execute_task(started_cluster, task, cmd_options):
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
print(cmd)
logging.debug(f"execute_task cmd: {cmd}")
for instance_name, instance in started_cluster.instances.items():
for instance_name in started_cluster.instances.keys():
instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_three_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")

View File

@ -430,7 +430,7 @@ def execute_task(started_cluster, task, cmd_options):
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
docker_api = started_cluster.docker_client.api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
@ -443,7 +443,7 @@ def execute_task(started_cluster, task, cmd_options):
print(cmd)
for instance_name, instance in started_cluster.instances.items():
for instance_name in started_cluster.instances.keys():
instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_two_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")

View File

@ -150,7 +150,7 @@ def test_reload_after_loading(started_cluster):
time.sleep(1) # see the comment above
replace_in_file_in_container('/etc/clickhouse-server/dictionaries/executable.xml', '82', '83')
replace_in_file_in_container('/etc/clickhouse-server/dictionaries/file.txt', '102', '103')
time.sleep(7)
time.sleep(10)
assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "103\n"
assert query("SELECT dictGetInt32('executable', 'a', toUInt64(7))") == "83\n"

View File

@ -1,6 +1,6 @@
<yandex>
<zookeeper>
<!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">10000</session_timeout_ms>
<session_timeout_ms replace="1">15000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -1,6 +1,6 @@
<yandex>
<zookeeper>
<!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">10000</session_timeout_ms>
<session_timeout_ms replace="1">15000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -53,6 +53,7 @@ def test_default_database(test_cluster):
def test_create_view(test_cluster):
instance = test_cluster.instances['ch3']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance,
"CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
test_cluster.ddl_check_query(instance,
@ -76,7 +77,7 @@ def test_on_server_fail(test_cluster):
kill_instance.get_docker_handle().stop()
request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null",
timeout=30)
timeout=180)
kill_instance.get_docker_handle().start()
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
@ -92,27 +93,6 @@ def test_on_server_fail(test_cluster):
test_cluster.ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(test_cluster, zk_timeout):
instance = test_cluster.instances['ch1']
kill_instance = test_cluster.instances['ch2']
with PartitionManager() as pm:
pm.drop_instance_zk_connections(kill_instance)
request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=20)
time.sleep(zk_timeout)
pm.restore_instance_zk_connections(kill_instance)
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
def test_on_connection_loss(test_cluster):
_test_on_connection_losses(test_cluster, 5) # connection loss will occur only (3 sec ZK timeout in config)
def test_on_session_expired(test_cluster):
_test_on_connection_losses(test_cluster, 15) # session should be expired (3 sec ZK timeout in config)
def test_simple_alters(test_cluster):
instance = test_cluster.instances['ch2']
@ -190,7 +170,7 @@ def test_implicit_macros(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}' SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """
@ -270,6 +250,15 @@ def test_create_reserved(test_cluster):
def test_rename(test_cluster):
instance = test_cluster.instances['ch1']
rules = test_cluster.pm_random_drops.pop_rules()
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_shard ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_new ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_old ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)")
test_cluster.ddl_check_query(instance,
@ -326,12 +315,15 @@ def test_socket_timeout(test_cluster):
def test_replicated_without_arguments(test_cluster):
rules = test_cluster.pm_random_drops.pop_rules()
instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_atomic.rmt ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_atomic ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw") == \
@ -349,7 +341,7 @@ def test_replicated_without_arguments(test_cluster):
"CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n")
test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
@ -359,7 +351,7 @@ def test_replicated_without_arguments(test_cluster):
test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n")
assert instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw") == \
"CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster SYNC")
test_cluster.pm_random_drops.push_rules(rules)

View File

@ -38,9 +38,9 @@ def test_cluster(request):
def test_replicated_alters(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster SYNC")
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
@ -90,10 +90,10 @@ ENGINE = Distributed(cluster, default, merge_for_alter, i)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
''.join(['{}\t{}\n'.format(x, x) for x in range(4)]))
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster SYNC")
# Enable random ZK packet drops
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster SYNC")

View File

@ -21,16 +21,27 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB;
"""
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute(
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
drop_table_sql_template = """
DROP TABLE IF EXISTS `clickhouse`.`{}`;
"""
def get_mysql_conn(started_cluster, host):
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
return conn
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(tableName))
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
@pytest.fixture(scope="module")
def started_cluster():
@ -51,7 +62,10 @@ def started_cluster():
def test_many_connections(started_cluster):
table_name = 'test_many_connections'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
@ -66,14 +80,18 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
query += "SELECT id FROM {t})"
assert node1.query(query.format(t=table_name)) == '250\n'
drop_mysql_table(conn, table_name)
conn.close()
def test_insert_select(started_cluster):
table_name = 'test_insert_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
@ -87,7 +105,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_replace_select(started_cluster):
table_name = 'test_replace_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
@ -106,7 +126,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_insert_on_duplicate_select(started_cluster):
table_name = 'test_insert_on_duplicate_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
@ -125,7 +147,10 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_where(started_cluster):
table_name = 'test_where'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
@ -146,6 +171,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_table_function(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, 'table_function')
create_mysql_table(conn, 'table_function')
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
@ -168,6 +194,8 @@ def test_table_function(started_cluster):
def test_binary_type(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, 'binary_type')
with conn.cursor() as cursor:
cursor.execute("CREATE TABLE clickhouse.binary_type (id INT PRIMARY KEY, data BINARY(16) NOT NULL)")
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('binary_type')
@ -177,7 +205,10 @@ def test_binary_type(started_cluster):
def test_enum_type(started_cluster):
table_name = 'test_enum_type'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('IP' = 1, 'URL' = 2)) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse', 1);
@ -186,20 +217,8 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
assert node1.query("SELECT source FROM {} LIMIT 1".format(table_name)).rstrip() == 'URL'
conn.close()
def get_mysql_conn(started_cluster, host):
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
def test_mysql_distributed(started_cluster):
table_name = 'test_replicas'
@ -218,6 +237,8 @@ def test_mysql_distributed(started_cluster):
create_mysql_table(conn3, table_name)
create_mysql_table(conn4, table_name)
node2.query('DROP TABLE IF EXISTS test_replicas')
# Storage with with 3 replicas
node2.query('''
CREATE TABLE test_replicas
@ -227,6 +248,7 @@ def test_mysql_distributed(started_cluster):
# Fill remote tables with different data to be able to check
nodes = [node1, node2, node2, node2]
for i in range(1, 5):
nodes[i-1].query('DROP TABLE IF EXISTS test_replica{}'.format(i))
nodes[i-1].query('''
CREATE TABLE test_replica{}
(id UInt32, name String, age UInt32, money UInt32)
@ -249,6 +271,8 @@ def test_mysql_distributed(started_cluster):
assert(result == 'host2\nhost3\nhost4\n')
# Storage with with two shards, each has 2 replicas
node2.query('DROP TABLE IF EXISTS test_shards')
node2.query('''
CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32)
@ -275,9 +299,12 @@ def test_mysql_distributed(started_cluster):
def test_external_settings(started_cluster):
table_name = 'test_external_settings'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, started_cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name)
node3.query(f'DROP TABLE IF EXISTS {table_name}')
node3.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))

View File

@ -308,6 +308,21 @@ def test_postgres_distributed(started_cluster):
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_datetime_with_timezone(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute("CREATE TABLE test_timezone (ts timestamp without time zone, ts_z timestamp with time zone)")
cursor.execute("insert into test_timezone select '2014-04-04 20:00:00', '2014-04-04 20:00:00'::timestamptz at time zone 'America/New_York';")
cursor.execute("select * from test_timezone")
result = cursor.fetchall()[0]
print(result[0], str(result[1])[:-6])
node1.query("create table test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'clickhouse', 'test_timezone', 'postgres', 'mysecretpassword');")
assert(node1.query("select ts from test_timezone").strip() == str(result[0]))
# [:-6] because 2014-04-04 16:00:00+00:00 -> 2014-04-04 16:00:00
assert(node1.query("select ts_z from test_timezone").strip() == str(result[1])[:-6])
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -30,6 +30,7 @@ def started_cluster():
def test_chroot_with_same_root(started_cluster):
for i, node in enumerate([node1, node2]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
@ -44,6 +45,7 @@ def test_chroot_with_same_root(started_cluster):
def test_chroot_with_different_root(started_cluster):
for i, node in [(1, node1), (3, node3)]:
node.query('DROP TABLE IF EXISTS simple_different SYNC')
node.query('''
CREATE TABLE simple_different (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple_different', '{replica}', date, id, 8192);

View File

@ -22,6 +22,8 @@ def started_cluster():
cluster.shutdown()
def test_identity(started_cluster):
node1.query('DROP TABLE IF EXISTS simple SYNC')
node1.query('''
CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);

View File

@ -246,3 +246,4 @@
01901_test_attach_partition_from
01910_view_dictionary
01824_prefer_global_in_and_join
01576_alias_column_rewrite

View File

@ -22,15 +22,15 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
tasks = []
with Pool(8) as pool:
try:
run_scenario(pool, tasks, Feature(test=load("example.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("ldap.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("rbac.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("aes_encryption.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("map_type.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("window_functions.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("datetime64_extended_range.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("example.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("ldap.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("rbac.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("aes_encryption.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("map_type.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("window_functions.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("datetime64_extended_range.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("kerberos.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("extended_precision_data_types.regression", "regression")), args)
#run_scenario(pool, tasks, Feature(test=load("extended_precision_data_types.regression", "regression")), args)
finally:
join(tasks)