Merge branch 'master' into use-dag-in-key-condition

This commit is contained in:
Nikolai Kochetov 2021-06-22 17:02:01 +03:00
commit 3dc0b9c096
52 changed files with 537 additions and 133 deletions

View File

@ -2,7 +2,7 @@
#include <errmsg.h> #include <errmsg.h>
#include <mysql.h> #include <mysql.h>
#else #else
#include <mysql/errmsg.h> #include <mysql/errmsg.h> //Y_IGNORE
#include <mysql/mysql.h> #include <mysql/mysql.h>
#endif #endif

39
base/mysqlxx/ya.make Normal file
View File

@ -0,0 +1,39 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
LIBRARY()
OWNER(g:clickhouse)
CFLAGS(-g0)
PEERDIR(
contrib/restricted/boost/libs
contrib/libs/libmysql_r
contrib/libs/poco/Foundation
contrib/libs/poco/Util
)
ADDINCL(
GLOBAL clickhouse/base
clickhouse/base
contrib/libs/libmysql_r
)
NO_COMPILER_WARNINGS()
NO_UTIL()
SRCS(
Connection.cpp
Exception.cpp
Pool.cpp
PoolFactory.cpp
PoolWithFailover.cpp
Query.cpp
ResultBase.cpp
Row.cpp
UseQueryResult.cpp
Value.cpp
)
END()

28
base/mysqlxx/ya.make.in Normal file
View File

@ -0,0 +1,28 @@
LIBRARY()
OWNER(g:clickhouse)
CFLAGS(-g0)
PEERDIR(
contrib/restricted/boost/libs
contrib/libs/libmysql_r
contrib/libs/poco/Foundation
contrib/libs/poco/Util
)
ADDINCL(
GLOBAL clickhouse/base
clickhouse/base
contrib/libs/libmysql_r
)
NO_COMPILER_WARNINGS()
NO_UTIL()
SRCS(
<? find . -name '*.cpp' | grep -v -F tests/ | grep -v -F examples | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -4,6 +4,7 @@ RECURSE(
common common
daemon daemon
loggers loggers
mysqlxx
pcg-random pcg-random
widechar_width widechar_width
readpassphrase readpassphrase

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 2a1bf7d87b4a03561fc66fbb49cee8a288983c5d Subproject commit 976874b7aa7f422bf4ea595bb7d1166c617b1c26

2
contrib/h3 vendored

@ -1 +1 @@
Subproject commit e209086ae1b5477307f545a0f6111780edc59940 Subproject commit 5c44b06c406613b7792a60b11d04b871116f6e30

View File

@ -148,5 +148,10 @@ toc_title: Adopters
| <a href="https://www.kakaocorp.com/" class="favicon">kakaocorp</a> | Internet company | — | — | — | [if(kakao)2020 conference](https://if.kakao.com/session/117) | | <a href="https://www.kakaocorp.com/" class="favicon">kakaocorp</a> | Internet company | — | — | — | [if(kakao)2020 conference](https://if.kakao.com/session/117) |
| <a href="https://shop.okraina.ru/" class="favicon">ООО «МПЗ Богородский»</a> | Agriculture | — | — | — | [Article in Russian, November 2020](https://cloud.yandex.ru/cases/okraina) | | <a href="https://shop.okraina.ru/" class="favicon">ООО «МПЗ Богородский»</a> | Agriculture | — | — | — | [Article in Russian, November 2020](https://cloud.yandex.ru/cases/okraina) |
| <a href="https://www.tesla.com/" class="favicon">Tesla</a> | Electric vehicle and clean energy company | — | — | — | [Vacancy description, March 2021](https://news.ycombinator.com/item?id=26306170) | | <a href="https://www.tesla.com/" class="favicon">Tesla</a> | Electric vehicle and clean energy company | — | — | — | [Vacancy description, March 2021](https://news.ycombinator.com/item?id=26306170) |
| <a href="https://www.kgk-global.com/en/" class="favicon">KGK Global</a> | Vehicle monitoring | — | — | — | [Press release, June 2021](https://zoom.cnews.ru/news/item/530921) |
| <a href="https://www.bilibili.com/" class="favicon">BiliBili</a> | Video sharing | — | — | — | [Blog post, June 2021](https://chowdera.com/2021/06/20210622012241476b.html) |
| <a href="https://gigapipe.com/" class="favicon">Gigapipe</a> | Managed ClickHouse | Main product | — | — | [Official website](https://gigapipe.com/) |
| <a href="https://www.hydrolix.io/" class="favicon">Hydrolix</a> | Cloud data platform | Main product | — | — | [Documentation](https://docs.hydrolix.io/guide/query) |
| <a href="https://www.argedor.com/en/clickhouse/" class="favicon">Argedor</a> | ClickHouse support | — | — | — | [Official website](https://www.argedor.com/en/clickhouse/) |
[Original article](https://clickhouse.tech/docs/en/introduction/adopters/) <!--hide--> [Original article](https://clickhouse.tech/docs/en/introduction/adopters/) <!--hide-->

View File

@ -195,6 +195,41 @@ Result:
└────────────────────┘ └────────────────────┘
``` ```
## h3ToGeo {#h3togeo}
Returns `(lon, lat)` that corresponds to the provided H3 index.
**Syntax**
``` sql
h3ToGeo(h3Index)
```
**Arguments**
- `h3Index` — H3 Index. Type: [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned values**
- `lon` — Longitude. Type: [Float64](../../../sql-reference/data-types/float.md).
- `lat` — Latitude. Type: [Float64](../../../sql-reference/data-types/float.md).
**Example**
Query:
``` sql
SELECT h3ToGeo(644325524701193974) coordinates;
```
Result:
``` text
┌─coordinates───────────────────────────┐
│ (37.79506616830252,55.71290243145668) │
└───────────────────────────────────────┘
```
## h3kRing {#h3kring} ## h3kRing {#h3kring}
Lists all the [H3](#h3index) hexagons in the raduis of `k` from the given hexagon in random order. Lists all the [H3](#h3index) hexagons in the raduis of `k` from the given hexagon in random order.

View File

@ -132,7 +132,7 @@ void ODBCBlockInputStream::insertValue(
auto value = row.get<std::string>(idx); auto value = row.get<std::string>(idx);
ReadBufferFromString in(value); ReadBufferFromString in(value);
time_t time = 0; time_t time = 0;
readDateTimeText(time, in); readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(data_type.get())->getTimeZone());
if (time < 0) if (time < 0)
time = 0; time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time); assert_cast<ColumnUInt32 &>(column).insertValue(time);

View File

@ -39,7 +39,7 @@ public:
void setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update = false); void setFileProgressCallback(ContextMutablePtr context, bool write_progress_on_update = false);
/// How much seconds passed since query execution start. /// How much seconds passed since query execution start.
UInt64 elapsedSeconds() const { return watch.elapsedSeconds(); } double elapsedSeconds() const { return watch.elapsedSeconds(); }
private: private:
/// This flag controls whether to show the progress bar. We start showing it after /// This flag controls whether to show the progress bar. We start showing it after

View File

@ -23,9 +23,10 @@ using IndexToLogEntry = std::unordered_map<uint64_t, LogEntryPtr>;
enum class ChangelogVersion : uint8_t enum class ChangelogVersion : uint8_t
{ {
V0 = 0, V0 = 0,
V1 = 1, /// with 64 bit buffer header
}; };
static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V0; static constexpr auto CURRENT_CHANGELOG_VERSION = ChangelogVersion::V1;
struct ChangelogRecordHeader struct ChangelogRecordHeader
{ {

View File

@ -204,7 +204,7 @@ SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage,
uint8_t version; uint8_t version;
readBinary(version, in); readBinary(version, in);
SnapshotVersion current_version = static_cast<SnapshotVersion>(version); SnapshotVersion current_version = static_cast<SnapshotVersion>(version);
if (current_version > SnapshotVersion::V1) if (current_version > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version); throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
SnapshotMetadataPtr result = deserializeSnapshotMetadata(in); SnapshotMetadataPtr result = deserializeSnapshotMetadata(in);

View File

@ -14,8 +14,11 @@ enum SnapshotVersion : uint8_t
{ {
V0 = 0, V0 = 0,
V1 = 1, /// with ACL map V1 = 1, /// with ACL map
V2 = 2, /// with 64 bit buffer header
}; };
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V2;
struct KeeperStorageSnapshot struct KeeperStorageSnapshot
{ {
public: public:
@ -30,7 +33,7 @@ public:
KeeperStorage * storage; KeeperStorage * storage;
SnapshotVersion version = SnapshotVersion::V1; SnapshotVersion version = CURRENT_SNAPSHOT_VERSION;
SnapshotMetadataPtr snapshot_meta; SnapshotMetadataPtr snapshot_meta;
int64_t session_id; int64_t session_id;
size_t snapshot_container_size; size_t snapshot_container_size;

View File

@ -170,7 +170,7 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
{ {
ReadBufferFromString in(value); ReadBufferFromString in(value);
time_t time = 0; time_t time = 0;
readDateTimeText(time, in); readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(data_type.get())->getTimeZone());
if (time < 0) if (time < 0)
time = 0; time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time); assert_cast<ColumnUInt32 &>(column).insertValue(time);
@ -272,11 +272,11 @@ void PostgreSQLBlockInputStream::prepareArrayInfo(size_t column_idx, const DataT
else if (which.isDate()) else if (which.isDate())
parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; }; parser = [](std::string & field) -> Field { return UInt16{LocalDate{field}.getDayNum()}; };
else if (which.isDateTime()) else if (which.isDateTime())
parser = [](std::string & field) -> Field parser = [nested](std::string & field) -> Field
{ {
ReadBufferFromString in(field); ReadBufferFromString in(field);
time_t time = 0; time_t time = 0;
readDateTimeText(time, in); readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(nested.get())->getTimeZone());
return time; return time;
}; };
else if (which.isDecimal32()) else if (which.isDecimal32())

View File

@ -169,7 +169,7 @@ namespace
{ {
ReadBufferFromString in(value); ReadBufferFromString in(value);
time_t time = 0; time_t time = 0;
readDateTimeText(time, in); readDateTimeText(time, in, assert_cast<const DataTypeDateTime &>(data_type).getTimeZone());
if (time < 0) if (time < 0)
time = 0; time = 0;
assert_cast<ColumnUInt32 &>(column).insertValue(time); assert_cast<ColumnUInt32 &>(column).insertValue(time);

96
src/Functions/h3toGeo.cpp Normal file
View File

@ -0,0 +1,96 @@
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
#if USE_H3
#include <array>
#include <math.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <h3api.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
/// Implements the function h3ToGeo which takes a single argument (h3Index)
/// and returns the longitude and latitude that correspond to the provided h3 index
class FunctionH3ToGeo : public IFunction
{
public:
static constexpr auto name = "h3ToGeo";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionH3ToGeo>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const auto * arg = arguments[0].get();
if (!WhichDataType(arg).isUInt64())
throw Exception(
"Illegal type " + arg->getName() + " of argument " + std::to_string(1) + " of function " + getName() + ". Must be UInt64",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeFloat64>(), std::make_shared<DataTypeFloat64>()},
Strings{"longitude", "latitude"});
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const auto * col_index = arguments[0].column.get();
auto latitude = ColumnFloat64::create(input_rows_count);
auto longitude = ColumnFloat64::create(input_rows_count);
ColumnFloat64::Container & lon_data = longitude->getData();
ColumnFloat64::Container & lat_data = latitude->getData();
for (size_t row = 0; row < input_rows_count; ++row)
{
H3Index h3index = col_index->getUInt(row);
GeoCoord coord{};
h3ToGeo(h3index,&coord);
lon_data[row] = radsToDegs(coord.lon);
lat_data[row] = radsToDegs(coord.lat);
}
MutableColumns columns;
columns.emplace_back(std::move(longitude));
columns.emplace_back(std::move(latitude));
return ColumnTuple::create(std::move(columns));
}
};
}
void registerFunctionH3ToGeo(FunctionFactory & factory)
{
factory.registerFunction<FunctionH3ToGeo>();
}
}
#endif

View File

@ -28,6 +28,7 @@ void registerFunctionSvg(FunctionFactory & factory);
#if USE_H3 #if USE_H3
void registerFunctionGeoToH3(FunctionFactory &); void registerFunctionGeoToH3(FunctionFactory &);
void registerFunctionH3ToGeo(FunctionFactory &);
void registerFunctionH3EdgeAngle(FunctionFactory &); void registerFunctionH3EdgeAngle(FunctionFactory &);
void registerFunctionH3EdgeLengthM(FunctionFactory &); void registerFunctionH3EdgeLengthM(FunctionFactory &);
void registerFunctionH3GetResolution(FunctionFactory &); void registerFunctionH3GetResolution(FunctionFactory &);
@ -66,6 +67,7 @@ void registerFunctionsGeo(FunctionFactory & factory)
#if USE_H3 #if USE_H3
registerFunctionGeoToH3(factory); registerFunctionGeoToH3(factory);
registerFunctionH3ToGeo(factory);
registerFunctionH3EdgeAngle(factory); registerFunctionH3EdgeAngle(factory);
registerFunctionH3EdgeLengthM(factory); registerFunctionH3EdgeLengthM(factory);
registerFunctionH3GetResolution(factory); registerFunctionH3GetResolution(factory);

View File

@ -22,9 +22,10 @@ const char * ParserMultiplicativeExpression::operators[] =
nullptr nullptr
}; };
const char * ParserUnaryMinusExpression::operators[] = const char * ParserUnaryExpression::operators[] =
{ {
"-", "negate", "-", "negate",
"NOT", "not",
nullptr nullptr
}; };
@ -539,7 +540,7 @@ bool ParserPrefixUnaryOperatorExpression::parseImpl(Pos & pos, ASTPtr & node, Ex
} }
bool ParserUnaryMinusExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool ParserUnaryExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
/// As an exception, negative numbers should be parsed as literals, and not as an application of the operator. /// As an exception, negative numbers should be parsed as literals, and not as an application of the operator.

View File

@ -245,14 +245,14 @@ protected:
}; };
class ParserUnaryMinusExpression : public IParserBase class ParserUnaryExpression : public IParserBase
{ {
private: private:
static const char * operators[]; static const char * operators[];
ParserPrefixUnaryOperatorExpression operator_parser {operators, std::make_unique<ParserTupleElementExpression>()}; ParserPrefixUnaryOperatorExpression operator_parser {operators, std::make_unique<ParserTupleElementExpression>()};
protected: protected:
const char * getName() const override { return "unary minus expression"; } const char * getName() const override { return "unary expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
}; };
@ -262,7 +262,7 @@ class ParserMultiplicativeExpression : public IParserBase
{ {
private: private:
static const char * operators[]; static const char * operators[];
ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserUnaryMinusExpression>()}; ParserLeftAssociativeBinaryOperatorList operator_parser {operators, std::make_unique<ParserUnaryExpression>()};
protected: protected:
const char * getName() const override { return "multiplicative expression"; } const char * getName() const override { return "multiplicative expression"; }

View File

@ -812,6 +812,9 @@ KeyCondition::KeyCondition(
*/ */
Block block_with_constants = getBlockWithConstants(query, syntax_analyzer_result, context); Block block_with_constants = getBlockWithConstants(query, syntax_analyzer_result, context);
for (const auto & [name, _] : syntax_analyzer_result->array_join_result_to_source)
array_joined_columns.insert(name);
const ASTSelectQuery & select = query->as<ASTSelectQuery &>(); const ASTSelectQuery & select = query->as<ASTSelectQuery &>();
if (select.where() || select.prewhere()) if (select.where() || select.prewhere())
{ {
@ -1011,6 +1014,10 @@ bool KeyCondition::canConstantBeWrappedByMonotonicFunctions(
DataTypePtr & out_type) DataTypePtr & out_type)
{ {
String expr_name = node.getColumnName(); String expr_name = node.getColumnName();
if (array_joined_columns.count(expr_name))
return false;
if (key_subexpr_names.count(expr_name) == 0) if (key_subexpr_names.count(expr_name) == 0)
return false; return false;
@ -1115,6 +1122,9 @@ bool KeyCondition::canConstantBeWrappedByFunctions(
{ {
String expr_name = node.getColumnName(); String expr_name = node.getColumnName();
if (array_joined_columns.count(expr_name))
return false;
if (key_subexpr_names.count(expr_name) == 0) if (key_subexpr_names.count(expr_name) == 0)
{ {
/// Let's check another one case. /// Let's check another one case.
@ -1447,6 +1457,9 @@ bool KeyCondition::isKeyPossiblyWrappedByMonotonicFunctionsImpl(
// Key columns should use canonical names for index analysis // Key columns should use canonical names for index analysis
String name = node.getColumnName(); String name = node.getColumnName();
if (array_joined_columns.count(name))
return false;
auto it = key_columns.find(name); auto it = key_columns.find(name);
if (key_columns.end() != it) if (key_columns.end() != it)
{ {

View File

@ -474,6 +474,8 @@ private:
const ExpressionActionsPtr key_expr; const ExpressionActionsPtr key_expr;
/// All intermediate columns are used to calculate key_expr. /// All intermediate columns are used to calculate key_expr.
const NameSet key_subexpr_names; const NameSet key_subexpr_names;
NameSet array_joined_columns;
PreparedSets prepared_sets; PreparedSets prepared_sets;
// If true, always allow key_expr to be wrapped by function // If true, always allow key_expr to be wrapped by function

View File

@ -377,8 +377,8 @@ class ClickhouseIntegrationTestsRunner:
test_cmd = ' '.join([test for test in sorted(test_names)]) test_cmd = ' '.join([test for test in sorted(test_names)])
parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else "" parallel_cmd = " --parallel {} ".format(num_workers) if num_workers > 0 else ""
cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --color=no --durations=0 {}' | tee {}".format( cmd = "cd {}/tests/integration && ./runner --tmpfs {} -t {} {} '-ss -rfEp --run-id={} --color=no --durations=0 {}' | tee {}".format(
repo_path, image_cmd, test_cmd, parallel_cmd, _get_deselect_option(self.should_skip_tests()), output_path) repo_path, image_cmd, test_cmd, parallel_cmd, i, _get_deselect_option(self.should_skip_tests()), output_path)
with open(log_path, 'w') as log: with open(log_path, 'w') as log:
logging.info("Executing cmd: %s", cmd) logging.info("Executing cmd: %s", cmd)

View File

@ -29,3 +29,9 @@ def cleanup_environment():
pass pass
yield yield
def pytest_addoption(parser):
parser.addoption("--run-id", default="", help="run-id is used as postfix in _instances_{} directory")
def pytest_configure(config):
os.environ['INTEGRATION_TESTS_RUN_ID'] = config.option.run_id

View File

@ -1,6 +1,7 @@
import os import os
import subprocess as sp import subprocess as sp
import tempfile import tempfile
import logging
from threading import Timer from threading import Timer
@ -105,6 +106,7 @@ class CommandRequest:
stderr = self.stderr_file.read().decode('utf-8', errors='replace') stderr = self.stderr_file.read().decode('utf-8', errors='replace')
if self.timer is not None and not self.process_finished_before_timeout and not self.ignore_error: if self.timer is not None and not self.process_finished_before_timeout and not self.ignore_error:
logging.debug(f"Timed out. Last stdout:{stdout}, stderr:{stderr}")
raise QueryTimeoutExceedException('Client timed out!') raise QueryTimeoutExceedException('Client timed out!')
if (self.process.returncode != 0 or stderr) and not self.ignore_error: if (self.process.returncode != 0 or stderr) and not self.ignore_error:

View File

@ -29,7 +29,6 @@ from dict2xml import dict2xml
from kazoo.client import KazooClient from kazoo.client import KazooClient
from kazoo.exceptions import KazooException from kazoo.exceptions import KazooException
from minio import Minio from minio import Minio
from minio.deleteobjects import DeleteObject
from helpers.test_tools import assert_eq_with_retry from helpers.test_tools import assert_eq_with_retry
import docker import docker
@ -172,6 +171,13 @@ def enable_consistent_hash_plugin(rabbitmq_id):
p.communicate() p.communicate()
return p.returncode == 0 return p.returncode == 0
def get_instances_dir():
if 'INTEGRATION_TESTS_RUN_ID' in os.environ and os.environ['INTEGRATION_TESTS_RUN_ID']:
return '_instances_' + shlex.quote(os.environ['INTEGRATION_TESTS_RUN_ID'])
else:
return '_instances'
class ClickHouseCluster: class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper. """ClickHouse cluster with several instances and (possibly) ZooKeeper.
@ -203,7 +209,14 @@ class ClickHouseCluster:
project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
# docker-compose removes everything non-alphanumeric from project names so we do it too. # docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r'[^a-z0-9]', '', project_name.lower()) self.project_name = re.sub(r'[^a-z0-9]', '', project_name.lower())
self.instances_dir = p.join(self.base_dir, '_instances' + ('' if not self.name else '_' + self.name)) instances_dir_name = '_instances'
if self.name:
instances_dir_name += '_' + self.name
if 'INTEGRATION_TESTS_RUN_ID' in os.environ and os.environ['INTEGRATION_TESTS_RUN_ID']:
instances_dir_name += '_' + shlex.quote(os.environ['INTEGRATION_TESTS_RUN_ID'])
self.instances_dir = p.join(self.base_dir, instances_dir_name)
self.docker_logs_path = p.join(self.instances_dir, 'docker.log') self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME) self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {} self.env_variables = {}
@ -421,7 +434,15 @@ class ClickHouseCluster:
pass pass
def get_docker_handle(self, docker_id): def get_docker_handle(self, docker_id):
return self.docker_client.containers.get(docker_id) exception = None
for i in range(5):
try:
return self.docker_client.containers.get(docker_id)
except Exception as ex:
print("Got exception getting docker handle", str(ex))
time.sleep(i * 2)
exception = ex
raise exception
def get_client_cmd(self): def get_client_cmd(self):
cmd = self.client_bin_path cmd = self.client_bin_path
@ -577,7 +598,7 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]) self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')])
self.base_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name, self.base_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')] '--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]
print("HDFS BASE CMD:{}".format(self.base_hdfs_cmd)) logging.debug("HDFS BASE CMD:{self.base_hdfs_cmd)}")
return self.base_hdfs_cmd return self.base_hdfs_cmd
def setup_kerberized_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir): def setup_kerberized_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
@ -1217,8 +1238,8 @@ class ClickHouseCluster:
for bucket in buckets: for bucket in buckets:
if minio_client.bucket_exists(bucket): if minio_client.bucket_exists(bucket):
delete_object_list = map( delete_object_list = map(
lambda x: DeleteObject(x.object_name), lambda x: x.object_name,
minio_client.list_objects(bucket, recursive=True), minio_client.list_objects_v2(bucket, recursive=True),
) )
errors = minio_client.remove_objects(bucket, delete_object_list) errors = minio_client.remove_objects(bucket, delete_object_list)
for error in errors: for error in errors:
@ -1468,9 +1489,9 @@ class ClickHouseCluster:
instance.docker_client = self.docker_client instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name) instance.ip_address = self.get_instance_ip(instance.name)
logging.debug("Waiting for ClickHouse start...") logging.debug("Waiting for ClickHouse start in {instance}, ip: {instance.ip_address}...")
instance.wait_for_start(start_timeout) instance.wait_for_start(start_timeout)
logging.debug("ClickHouse started") logging.debug("ClickHouse {instance} started")
instance.client = Client(instance.ip_address, command=self.client_bin_path) instance.client = Client(instance.ip_address, command=self.client_bin_path)
@ -1864,8 +1885,7 @@ class ClickHouseInstance:
self.start_clickhouse(stop_start_wait_sec) self.start_clickhouse(stop_start_wait_sec)
def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs): def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs):
container_id = self.get_docker_handle().id return self.cluster.exec_in_container(self.docker_id, cmd, detach, nothrow, **kwargs)
return self.cluster.exec_in_container(container_id, cmd, detach, nothrow, **kwargs)
def contains_in_log(self, substring): def contains_in_log(self, substring):
result = self.exec_in_container( result = self.exec_in_container(
@ -1905,8 +1925,7 @@ class ClickHouseInstance:
["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n' ["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n'
def copy_file_to_container(self, local_path, dest_path): def copy_file_to_container(self, local_path, dest_path):
container_id = self.get_docker_handle().id return self.cluster.copy_file_to_container(self.docker_id, local_path, dest_path)
return self.cluster.copy_file_to_container(container_id, local_path, dest_path)
def get_process_pid(self, process_name): def get_process_pid(self, process_name):
output = self.exec_in_container(["bash", "-c", output = self.exec_in_container(["bash", "-c",
@ -1961,6 +1980,7 @@ class ClickHouseInstance:
self.get_docker_handle().start() self.get_docker_handle().start()
def wait_for_start(self, start_timeout=None, connection_timeout=None): def wait_for_start(self, start_timeout=None, connection_timeout=None):
handle = self.get_docker_handle()
if start_timeout is None or start_timeout <= 0: if start_timeout is None or start_timeout <= 0:
raise Exception("Invalid timeout: {}".format(start_timeout)) raise Exception("Invalid timeout: {}".format(start_timeout))
@ -1983,11 +2003,10 @@ class ClickHouseInstance:
return False return False
while True: while True:
handle = self.get_docker_handle() handle.reload()
status = handle.status status = handle.status
if status == 'exited': if status == 'exited':
raise Exception("Instance `{}' failed to start. Container status: {}, logs: {}" raise Exception(f"Instance `{self.name}' failed to start. Container status: {status}, logs: {handle.logs().decode('utf-8')}")
.format(self.name, status, handle.logs().decode('utf-8')))
deadline = start_time + start_timeout deadline = start_time + start_timeout
# It is possible that server starts slowly. # It is possible that server starts slowly.
@ -1997,9 +2016,8 @@ class ClickHouseInstance:
current_time = time.time() current_time = time.time()
if current_time >= deadline: if current_time >= deadline:
raise Exception("Timed out while waiting for instance `{}' with ip address {} to start. " raise Exception(f"Timed out while waiting for instance `{self.name}' with ip address {self.ip_address} to start. " \
"Container status: {}, logs: {}".format(self.name, self.ip_address, status, f"Container status: {status}, logs: {handle.logs().decode('utf-8')}")
handle.logs().decode('utf-8')))
socket_timeout = min(start_timeout, deadline - current_time) socket_timeout = min(start_timeout, deadline - current_time)

View File

@ -1,5 +1,6 @@
import difflib import difflib
import time import time
import logging
from io import IOBase from io import IOBase
@ -56,7 +57,7 @@ def assert_eq_with_retry(instance, query, expectation, retry_count=20, sleep_tim
break break
time.sleep(sleep_time) time.sleep(sleep_time)
except Exception as ex: except Exception as ex:
print(("assert_eq_with_retry retry {} exception {}".format(i + 1, ex))) logging.exception(f"assert_eq_with_retry retry {i+1} exception {ex}")
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
val = TSV(get_result(instance.query(query, user=user, stdin=stdin, timeout=timeout, settings=settings, val = TSV(get_result(instance.query(query, user=user, stdin=stdin, timeout=timeout, settings=settings,
@ -76,7 +77,7 @@ def assert_logs_contain_with_retry(instance, substring, retry_count=20, sleep_ti
break break
time.sleep(sleep_time) time.sleep(sleep_time)
except Exception as ex: except Exception as ex:
print("contains_in_log_with_retry retry {} exception {}".format(i + 1, ex)) logging.exception(f"contains_in_log_with_retry retry {i+1} exception {ex}")
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
raise AssertionError("'{}' not found in logs".format(substring)) raise AssertionError("'{}' not found in logs".format(substring))
@ -89,7 +90,7 @@ def exec_query_with_retry(instance, query, retry_count=40, sleep_time=0.5, setti
break break
except Exception as ex: except Exception as ex:
exception = ex exception = ex
print("Failed to execute query '", query, "' on instance", instance.name, "will retry") logging.exception(f"Failed to execute query '{query}' on instance '{instance.name}' will retry")
time.sleep(sleep_time) time.sleep(sleep_time)
else: else:
raise exception raise exception

View File

@ -1,6 +1,6 @@
[pytest] [pytest]
python_files = test*.py python_files = test*.py
norecursedirs = _instances norecursedirs = _instances*
timeout = 1800 timeout = 1800
junit_duration_report = call junit_duration_report = call
junit_suite_name = integration junit_suite_name = integration

View File

@ -43,8 +43,8 @@ def test_backup_from_old_version(started_cluster):
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n" assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
node1.exec_in_container(['bash', '-c', node1.exec_in_container(['find', '/var/lib/clickhouse/shadow/1/data/default/source_table'])
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached']) node1.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n" assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -81,8 +81,7 @@ def test_backup_from_old_version_setting(started_cluster):
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n" assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
node2.exec_in_container(['bash', '-c', node2.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n" assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -123,8 +122,7 @@ def test_backup_from_old_version_config(started_cluster):
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n" assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
node3.exec_in_container(['bash', '-c', node3.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/', '/var/lib/clickhouse/data/default/dest_table/detached'])
'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n" assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
@ -156,8 +154,7 @@ def test_backup_and_alter(started_cluster):
node4.query("ALTER TABLE test.backup_table DROP PARTITION tuple()") node4.query("ALTER TABLE test.backup_table DROP PARTITION tuple()")
node4.exec_in_container(['bash', '-c', node4.exec_in_container(['cp', '-r', '/var/lib/clickhouse/shadow/1/data/test/backup_table/all_1_1_0/', '/var/lib/clickhouse/data/test/backup_table/detached'])
'cp -r /var/lib/clickhouse/shadow/1/data/test/backup_table/all_1_1_0/ /var/lib/clickhouse/data/test/backup_table/detached'])
node4.query("ALTER TABLE test.backup_table ATTACH PARTITION tuple()") node4.query("ALTER TABLE test.backup_table ATTACH PARTITION tuple()")

View File

@ -39,7 +39,7 @@ class Task:
for instance_name, _ in cluster.instances.items(): for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name] instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_taxi_data.xml'), self.container_task_file) instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_taxi_data.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file)) logging.debug(f"Copied task file to container of '{instance_name}' instance. Path {self.container_task_file}")
def start(self): def start(self):
@ -48,11 +48,11 @@ class Task:
node.query("DROP DATABASE IF EXISTS dailyhistory SYNC;") node.query("DROP DATABASE IF EXISTS dailyhistory SYNC;")
node.query("DROP DATABASE IF EXISTS monthlyhistory SYNC;") node.query("DROP DATABASE IF EXISTS monthlyhistory SYNC;")
instance = cluster.instances['first'] first = cluster.instances['first']
# daily partition database # daily partition database
instance.query("CREATE DATABASE IF NOT EXISTS dailyhistory on cluster events;") first.query("CREATE DATABASE IF NOT EXISTS dailyhistory on cluster events;")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata_staging ON CLUSTER events first.query("""CREATE TABLE dailyhistory.yellow_tripdata_staging ON CLUSTER events
( (
id UUID DEFAULT generateUUIDv4(), id UUID DEFAULT generateUUIDv4(),
vendor_id String, vendor_id String,
@ -84,12 +84,12 @@ class Task:
ORDER BY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (toYYYYMMDD(tpep_pickup_datetime))""") PARTITION BY (toYYYYMMDD(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata first.query("""CREATE TABLE dailyhistory.yellow_tripdata
ON CLUSTER events ON CLUSTER events
AS dailyhistory.yellow_tripdata_staging AS dailyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""") ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
instance.query("""INSERT INTO dailyhistory.yellow_tripdata first.query("""INSERT INTO dailyhistory.yellow_tripdata
SELECT * FROM generateRandom( SELECT * FROM generateRandom(
'id UUID DEFAULT generateUUIDv4(), 'id UUID DEFAULT generateUUIDv4(),
vendor_id String, vendor_id String,
@ -119,8 +119,8 @@ class Task:
1, 10, 2) LIMIT 50;""") 1, 10, 2) LIMIT 50;""")
# monthly partition database # monthly partition database
instance.query("create database IF NOT EXISTS monthlyhistory on cluster events;") first.query("create database IF NOT EXISTS monthlyhistory on cluster events;")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata_staging ON CLUSTER events first.query("""CREATE TABLE monthlyhistory.yellow_tripdata_staging ON CLUSTER events
( (
id UUID DEFAULT generateUUIDv4(), id UUID DEFAULT generateUUIDv4(),
vendor_id String, vendor_id String,
@ -153,16 +153,16 @@ class Task:
ORDER BY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))""") PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata first.query("""CREATE TABLE monthlyhistory.yellow_tripdata
ON CLUSTER events ON CLUSTER events
AS monthlyhistory.yellow_tripdata_staging AS monthlyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""") ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
def check(self): def check(self):
instance = cluster.instances["first"] first = cluster.instances["first"]
a = TSV(instance.query("SELECT count() from dailyhistory.yellow_tripdata")) a = TSV(first.query("SELECT count() from dailyhistory.yellow_tripdata"))
b = TSV(instance.query("SELECT count() from monthlyhistory.yellow_tripdata")) b = TSV(first.query("SELECT count() from monthlyhistory.yellow_tripdata"))
assert a == b, "Distributed tables" assert a == b, "Distributed tables"
for instance_name, instance in cluster.instances.items(): for instance_name, instance in cluster.instances.items():
@ -187,10 +187,10 @@ def execute_task(started_cluster, task, cmd_options):
task.start() task.start()
zk = started_cluster.get_kazoo_client('zoo1') zk = started_cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])) logging.debug("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node # Run cluster-copier processes on each node
docker_api = docker.from_env().api docker_api = started_cluster.docker_client.api
copiers_exec_ids = [] copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier', cmd = ['/usr/bin/clickhouse', 'copier',
@ -201,9 +201,9 @@ def execute_task(started_cluster, task, cmd_options):
'--base-dir', '/var/log/clickhouse-server/copier'] '--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options cmd += cmd_options
print(cmd) logging.debug(f"execute_task cmd: {cmd}")
for instance_name, instance in started_cluster.instances.items(): for instance_name in started_cluster.instances.keys():
instance = started_cluster.instances[instance_name] instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle() container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_three_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml") instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_three_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")

View File

@ -430,7 +430,7 @@ def execute_task(started_cluster, task, cmd_options):
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1])) print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node # Run cluster-copier processes on each node
docker_api = docker.from_env().api docker_api = started_cluster.docker_client.api
copiers_exec_ids = [] copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier', cmd = ['/usr/bin/clickhouse', 'copier',
@ -443,7 +443,7 @@ def execute_task(started_cluster, task, cmd_options):
print(cmd) print(cmd)
for instance_name, instance in started_cluster.instances.items(): for instance_name in started_cluster.instances.keys():
instance = started_cluster.instances[instance_name] instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle() container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_two_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml") instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_two_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")

View File

@ -150,7 +150,7 @@ def test_reload_after_loading(started_cluster):
time.sleep(1) # see the comment above time.sleep(1) # see the comment above
replace_in_file_in_container('/etc/clickhouse-server/dictionaries/executable.xml', '82', '83') replace_in_file_in_container('/etc/clickhouse-server/dictionaries/executable.xml', '82', '83')
replace_in_file_in_container('/etc/clickhouse-server/dictionaries/file.txt', '102', '103') replace_in_file_in_container('/etc/clickhouse-server/dictionaries/file.txt', '102', '103')
time.sleep(7) time.sleep(10)
assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "103\n" assert query("SELECT dictGetInt32('file', 'a', toUInt64(9))") == "103\n"
assert query("SELECT dictGetInt32('executable', 'a', toUInt64(7))") == "83\n" assert query("SELECT dictGetInt32('executable', 'a', toUInt64(7))") == "83\n"

View File

@ -1,6 +1,6 @@
<yandex> <yandex>
<zookeeper> <zookeeper>
<!-- Required for correct timing in current test case --> <!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">10000</session_timeout_ms> <session_timeout_ms replace="1">15000</session_timeout_ms>
</zookeeper> </zookeeper>
</yandex> </yandex>

View File

@ -1,6 +1,6 @@
<yandex> <yandex>
<zookeeper> <zookeeper>
<!-- Required for correct timing in current test case --> <!-- Required for correct timing in current test case -->
<session_timeout_ms replace="1">10000</session_timeout_ms> <session_timeout_ms replace="1">15000</session_timeout_ms>
</zookeeper> </zookeeper>
</yandex> </yandex>

View File

@ -53,6 +53,7 @@ def test_default_database(test_cluster):
def test_create_view(test_cluster): def test_create_view(test_cluster):
instance = test_cluster.instances['ch3'] instance = test_cluster.instances['ch3']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
"CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV") "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
@ -76,7 +77,7 @@ def test_on_server_fail(test_cluster):
kill_instance.get_docker_handle().stop() kill_instance.get_docker_handle().stop()
request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null",
timeout=30) timeout=180)
kill_instance.get_docker_handle().start() kill_instance.get_docker_handle().start()
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'") test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
@ -92,27 +93,6 @@ def test_on_server_fail(test_cluster):
test_cluster.ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'") test_cluster.ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(test_cluster, zk_timeout):
instance = test_cluster.instances['ch1']
kill_instance = test_cluster.instances['ch2']
with PartitionManager() as pm:
pm.drop_instance_zk_connections(kill_instance)
request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=20)
time.sleep(zk_timeout)
pm.restore_instance_zk_connections(kill_instance)
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
def test_on_connection_loss(test_cluster):
_test_on_connection_losses(test_cluster, 5) # connection loss will occur only (3 sec ZK timeout in config)
def test_on_session_expired(test_cluster):
_test_on_connection_losses(test_cluster, 15) # session should be expired (3 sec ZK timeout in config)
def test_simple_alters(test_cluster): def test_simple_alters(test_cluster):
instance = test_cluster.instances['ch2'] instance = test_cluster.instances['ch2']
@ -190,7 +170,7 @@ def test_implicit_macros(test_cluster):
instance = test_cluster.instances['ch2'] instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'") test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}' SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'") test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """ test_cluster.ddl_check_query(instance, """
@ -270,6 +250,15 @@ def test_create_reserved(test_cluster):
def test_rename(test_cluster): def test_rename(test_cluster):
instance = test_cluster.instances['ch1'] instance = test_cluster.instances['ch1']
rules = test_cluster.pm_random_drops.pop_rules() rules = test_cluster.pm_random_drops.pop_rules()
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_shard ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_new ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename_old ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance,
"DROP TABLE IF EXISTS rename ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
"CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)") "CREATE TABLE rename_shard ON CLUSTER cluster (id Int64, sid String DEFAULT concat('old', toString(id))) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/staging/test_shard', '{replica}') ORDER BY (id)")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
@ -326,12 +315,15 @@ def test_socket_timeout(test_cluster):
def test_replicated_without_arguments(test_cluster): def test_replicated_without_arguments(test_cluster):
rules = test_cluster.pm_random_drops.pop_rules() rules = test_cluster.pm_random_drops.pop_rules()
instance = test_cluster.instances['ch1'] instance = test_cluster.instances['ch1']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_atomic.rmt ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_atomic ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic") test_cluster.ddl_check_query(instance, "CREATE DATABASE test_atomic ON CLUSTER cluster ENGINE=Atomic")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \ assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n") instance.query_and_get_error("CREATE TABLE test_atomic.rmt (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n") "CREATE TABLE test_atomic.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree() ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE test_atomic.rmt ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n") "CREATE TABLE test_atomic.rmt UUID '12345678-0000-4000-8000-000000000001' ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree ORDER BY n")
assert instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw") == \ assert instance.query("SHOW CREATE test_atomic.rmt FORMAT TSVRaw") == \
@ -349,7 +341,7 @@ def test_replicated_without_arguments(test_cluster):
"CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n") "CREATE TABLE test_atomic.rsmt ON CLUSTER cluster (n UInt64, m UInt64, k UInt64) ENGINE=ReplicatedSummingMergeTree((m, k)) ORDER BY n")
test_cluster.ddl_check_query(instance, test_cluster.ddl_check_query(instance,
"CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n") "CREATE TABLE test_atomic.rvcmt ON CLUSTER cluster (n UInt64, m Int8, k UInt64) ENGINE=ReplicatedVersionedCollapsingMergeTree(m, k) ORDER BY n")
test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP DATABASE test_atomic ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary") test_cluster.ddl_check_query(instance, "CREATE DATABASE test_ordinary ON CLUSTER cluster ENGINE=Ordinary")
assert "are supported only for ON CLUSTER queries with Atomic database engine" in \ assert "are supported only for ON CLUSTER queries with Atomic database engine" in \
@ -359,7 +351,7 @@ def test_replicated_without_arguments(test_cluster):
test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n") test_cluster.ddl_check_query(instance, "CREATE TABLE test_ordinary.rmt ON CLUSTER cluster (n UInt64, s String) ENGINE=ReplicatedMergeTree('/{shard}/{table}/', '{replica}') ORDER BY n")
assert instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw") == \ assert instance.query("SHOW CREATE test_ordinary.rmt FORMAT TSVRaw") == \
"CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n" "CREATE TABLE test_ordinary.rmt\n(\n `n` UInt64,\n `s` String\n)\nENGINE = ReplicatedMergeTree('/{shard}/rmt/', '{replica}')\nORDER BY n\nSETTINGS index_granularity = 8192\n"
test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP DATABASE test_ordinary ON CLUSTER cluster SYNC")
test_cluster.pm_random_drops.push_rules(rules) test_cluster.pm_random_drops.push_rules(rules)

View File

@ -38,9 +38,9 @@ def test_cluster(request):
def test_replicated_alters(test_cluster): def test_replicated_alters(test_cluster):
instance = test_cluster.instances['ch2'] instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster SYNC")
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas # Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules() firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
@ -90,10 +90,10 @@ ENGINE = Distributed(cluster, default, merge_for_alter, i)
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV( assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(
''.join(['{}\t{}\n'.format(x, x) for x in range(4)])) ''.join(['{}\t{}\n'.format(x, x) for x in range(4)]))
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster SYNC")
# Enable random ZK packet drops # Enable random ZK packet drops
test_cluster.pm_random_drops.push_rules(firewall_drops_rules) test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster SYNC")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster") test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster SYNC")

View File

@ -6,7 +6,7 @@ import threading
import os import os
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, get_instances_dir
# By default the exceptions that was throwed in threads will be ignored # By default the exceptions that was throwed in threads will be ignored
@ -30,7 +30,7 @@ class SafeThread(threading.Thread):
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml') CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/storage_conf.xml'.format(get_instances_dir()))
def replace_config(old, new): def replace_config(old, new):

View File

@ -5,10 +5,10 @@ import string
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, get_instances_dir
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml') NOT_RESTORABLE_CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml'.format(get_instances_dir()))
COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/log_conf.xml", "configs/config.d/clusters.xml"] COMMON_CONFIGS = ["configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/log_conf.xml", "configs/config.d/clusters.xml"]

View File

@ -2,13 +2,14 @@ import os
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, get_instances_dir
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=["configs/max_table_size_to_drop.xml"]) node = cluster.add_instance('node', main_configs=["configs/max_table_size_to_drop.xml"])
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/max_table_size_to_drop.xml')
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/node/configs/config.d/max_table_size_to_drop.xml'.format(get_instances_dir()))
@pytest.fixture(scope="module") @pytest.fixture(scope="module")

View File

@ -21,16 +21,27 @@ create_table_sql_template = """
PRIMARY KEY (`id`)) ENGINE=InnoDB; PRIMARY KEY (`id`)) ENGINE=InnoDB;
""" """
def create_mysql_db(conn, name): drop_table_sql_template = """
with conn.cursor() as cursor: DROP TABLE IF EXISTS `clickhouse`.`{}`;
cursor.execute( """
"CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def get_mysql_conn(started_cluster, host):
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
return conn
def create_mysql_table(conn, tableName): def create_mysql_table(conn, tableName):
with conn.cursor() as cursor: with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName)) cursor.execute(create_table_sql_template.format(tableName))
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(drop_table_sql_template.format(tableName))
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
@ -51,7 +62,10 @@ def started_cluster():
def test_many_connections(started_cluster): def test_many_connections(started_cluster):
table_name = 'test_many_connections' table_name = 'test_many_connections'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
@ -66,14 +80,18 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
query += "SELECT id FROM {t})" query += "SELECT id FROM {t})"
assert node1.query(query.format(t=table_name)) == '250\n' assert node1.query(query.format(t=table_name)) == '250\n'
drop_mysql_table(conn, table_name)
conn.close() conn.close()
def test_insert_select(started_cluster): def test_insert_select(started_cluster):
table_name = 'test_insert_select' table_name = 'test_insert_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse'); CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name)) '''.format(table_name, table_name))
@ -87,7 +105,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_replace_select(started_cluster): def test_replace_select(started_cluster):
table_name = 'test_replace_select' table_name = 'test_replace_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
@ -106,7 +126,9 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_insert_on_duplicate_select(started_cluster): def test_insert_on_duplicate_select(started_cluster):
table_name = 'test_insert_on_duplicate_select' table_name = 'test_insert_on_duplicate_select'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
@ -125,7 +147,10 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_where(started_cluster): def test_where(started_cluster):
table_name = 'test_where' table_name = 'test_where'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse'); CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
@ -146,6 +171,7 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
def test_table_function(started_cluster): def test_table_function(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, 'table_function')
create_mysql_table(conn, 'table_function') create_mysql_table(conn, 'table_function')
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function') table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('table_function')
assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0' assert node1.query("SELECT count() FROM {}".format(table_function)).rstrip() == '0'
@ -168,6 +194,8 @@ def test_table_function(started_cluster):
def test_binary_type(started_cluster): def test_binary_type(started_cluster):
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, 'binary_type')
with conn.cursor() as cursor: with conn.cursor() as cursor:
cursor.execute("CREATE TABLE clickhouse.binary_type (id INT PRIMARY KEY, data BINARY(16) NOT NULL)") cursor.execute("CREATE TABLE clickhouse.binary_type (id INT PRIMARY KEY, data BINARY(16) NOT NULL)")
table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('binary_type') table_function = "mysql('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')".format('binary_type')
@ -177,7 +205,10 @@ def test_binary_type(started_cluster):
def test_enum_type(started_cluster): def test_enum_type(started_cluster):
table_name = 'test_enum_type' table_name = 'test_enum_type'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, cluster.mysql_ip) conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node1.query(''' node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('IP' = 1, 'URL' = 2)) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse', 1); CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('IP' = 1, 'URL' = 2)) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse', 1);
@ -186,20 +217,8 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32, source Enum8('
assert node1.query("SELECT source FROM {} LIMIT 1".format(table_name)).rstrip() == 'URL' assert node1.query("SELECT source FROM {} LIMIT 1".format(table_name)).rstrip() == 'URL'
conn.close() conn.close()
def get_mysql_conn(started_cluster, host):
conn = pymysql.connect(user='root', password='clickhouse', host=host, port=started_cluster.mysql_port)
return conn
def create_mysql_db(conn, name):
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS {}".format(name))
cursor.execute("CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(name))
def create_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(create_table_sql_template.format(tableName))
def test_mysql_distributed(started_cluster): def test_mysql_distributed(started_cluster):
table_name = 'test_replicas' table_name = 'test_replicas'
@ -218,6 +237,8 @@ def test_mysql_distributed(started_cluster):
create_mysql_table(conn3, table_name) create_mysql_table(conn3, table_name)
create_mysql_table(conn4, table_name) create_mysql_table(conn4, table_name)
node2.query('DROP TABLE IF EXISTS test_replicas')
# Storage with with 3 replicas # Storage with with 3 replicas
node2.query(''' node2.query('''
CREATE TABLE test_replicas CREATE TABLE test_replicas
@ -227,6 +248,7 @@ def test_mysql_distributed(started_cluster):
# Fill remote tables with different data to be able to check # Fill remote tables with different data to be able to check
nodes = [node1, node2, node2, node2] nodes = [node1, node2, node2, node2]
for i in range(1, 5): for i in range(1, 5):
nodes[i-1].query('DROP TABLE IF EXISTS test_replica{}'.format(i))
nodes[i-1].query(''' nodes[i-1].query('''
CREATE TABLE test_replica{} CREATE TABLE test_replica{}
(id UInt32, name String, age UInt32, money UInt32) (id UInt32, name String, age UInt32, money UInt32)
@ -249,6 +271,8 @@ def test_mysql_distributed(started_cluster):
assert(result == 'host2\nhost3\nhost4\n') assert(result == 'host2\nhost3\nhost4\n')
# Storage with with two shards, each has 2 replicas # Storage with with two shards, each has 2 replicas
node2.query('DROP TABLE IF EXISTS test_shards')
node2.query(''' node2.query('''
CREATE TABLE test_shards CREATE TABLE test_shards
(id UInt32, name String, age UInt32, money UInt32) (id UInt32, name String, age UInt32, money UInt32)
@ -275,9 +299,12 @@ def test_mysql_distributed(started_cluster):
def test_external_settings(started_cluster): def test_external_settings(started_cluster):
table_name = 'test_external_settings' table_name = 'test_external_settings'
node1.query(f'DROP TABLE IF EXISTS {table_name}')
conn = get_mysql_conn(started_cluster, started_cluster.mysql_ip) conn = get_mysql_conn(started_cluster, started_cluster.mysql_ip)
drop_mysql_table(conn, table_name)
create_mysql_table(conn, table_name) create_mysql_table(conn, table_name)
node3.query(f'DROP TABLE IF EXISTS {table_name}')
node3.query(''' node3.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse'); CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name)) '''.format(table_name, table_name))

View File

@ -308,6 +308,21 @@ def test_postgres_distributed(started_cluster):
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n') assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_datetime_with_timezone(started_cluster):
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute("CREATE TABLE test_timezone (ts timestamp without time zone, ts_z timestamp with time zone)")
cursor.execute("insert into test_timezone select '2014-04-04 20:00:00', '2014-04-04 20:00:00'::timestamptz at time zone 'America/New_York';")
cursor.execute("select * from test_timezone")
result = cursor.fetchall()[0]
print(result[0], str(result[1])[:-6])
node1.query("create table test_timezone ( ts DateTime, ts_z DateTime('America/New_York')) ENGINE PostgreSQL('postgres1:5432', 'clickhouse', 'test_timezone', 'postgres', 'mysecretpassword');")
assert(node1.query("select ts from test_timezone").strip() == str(result[0]))
# [:-6] because 2014-04-04 16:00:00+00:00 -> 2014-04-04 16:00:00
assert(node1.query("select ts_z from test_timezone").strip() == str(result[1])[:-6])
assert(node1.query("select * from test_timezone") == "2014-04-04 20:00:00\t2014-04-04 16:00:00\n")
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")

View File

@ -9,12 +9,13 @@ import time
import helpers.client import helpers.client
import pytest import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir
MINIO_INTERNAL_PORT = 9001 MINIO_INTERNAL_PORT = 9001
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/dummy/configs/config.d/defaultS3.xml')
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/dummy/configs/config.d/defaultS3.xml'.format(get_instances_dir()))
# Creates S3 bucket for tests and allows anonymous read-write access to it. # Creates S3 bucket for tests and allows anonymous read-write access to it.

View File

@ -20,10 +20,6 @@ system_logs = [
('system.metric_log', 1), ('system.metric_log', 1),
] ]
# Default timeout for flush is 60
# decrease timeout for the test to show possible issues.
timeout = pytest.mark.timeout(30)
@pytest.fixture(scope='module', autouse=True) @pytest.fixture(scope='module', autouse=True)
def start_cluster(): def start_cluster():
@ -39,7 +35,6 @@ def flush_logs():
node.query('SYSTEM FLUSH LOGS') node.query('SYSTEM FLUSH LOGS')
@timeout
@pytest.mark.parametrize('table,exists', system_logs) @pytest.mark.parametrize('table,exists', system_logs)
def test_system_logs(flush_logs, table, exists): def test_system_logs(flush_logs, table, exists):
q = 'SELECT * FROM {}'.format(table) q = 'SELECT * FROM {}'.format(table)
@ -51,7 +46,6 @@ def test_system_logs(flush_logs, table, exists):
# Logic is tricky, let's check that there is no hang in case of message queue # Logic is tricky, let's check that there is no hang in case of message queue
# is not empty (this is another code path in the code). # is not empty (this is another code path in the code).
@timeout
def test_system_logs_non_empty_queue(): def test_system_logs_non_empty_queue():
node.query('SELECT 1', settings={ node.query('SELECT 1', settings={
# right now defaults are the same, # right now defaults are the same,

View File

@ -30,6 +30,7 @@ def started_cluster():
def test_chroot_with_same_root(started_cluster): def test_chroot_with_same_root(started_cluster):
for i, node in enumerate([node1, node2]): for i, node in enumerate([node1, node2]):
node.query('DROP TABLE IF EXISTS simple SYNC')
node.query(''' node.query('''
CREATE TABLE simple (date Date, id UInt32) CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192); ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);
@ -44,6 +45,7 @@ def test_chroot_with_same_root(started_cluster):
def test_chroot_with_different_root(started_cluster): def test_chroot_with_different_root(started_cluster):
for i, node in [(1, node1), (3, node3)]: for i, node in [(1, node1), (3, node3)]:
node.query('DROP TABLE IF EXISTS simple_different SYNC')
node.query(''' node.query('''
CREATE TABLE simple_different (date Date, id UInt32) CREATE TABLE simple_different (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple_different', '{replica}', date, id, 8192); ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple_different', '{replica}', date, id, 8192);

View File

@ -22,6 +22,8 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def test_identity(started_cluster): def test_identity(started_cluster):
node1.query('DROP TABLE IF EXISTS simple SYNC')
node1.query(''' node1.query('''
CREATE TABLE simple (date Date, id UInt32) CREATE TABLE simple (date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192); ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', '{replica}', date, id, 8192);

View File

@ -7,3 +7,4 @@ SELECT h3kRing(0xFFFFFFFFFFFFFF, 1000) FORMAT Null;
SELECT h3GetBaseCell(0xFFFFFFFFFFFFFF) FORMAT Null; SELECT h3GetBaseCell(0xFFFFFFFFFFFFFF) FORMAT Null;
SELECT h3GetResolution(0xFFFFFFFFFFFFFF) FORMAT Null; SELECT h3GetResolution(0xFFFFFFFFFFFFFF) FORMAT Null;
SELECT h3kRing(0xFFFFFFFFFFFFFF, 10) FORMAT Null; SELECT h3kRing(0xFFFFFFFFFFFFFF, 10) FORMAT Null;
SELECT h3ToGeo(0xFFFFFFFFFFFFFF) FORMAT Null;

View File

@ -0,0 +1,32 @@
(-173.6412167681162,-14.130272474941535)
(59.48137613600854,58.020407687755686)
(172.68095885060296,-83.6576608516349)
(-94.46556851304558,-69.1999982492279)
(-8.188263637093279,-55.856179102736284)
(77.25594891852249,47.39278564360122)
(135.11348004704536,36.60778126579667)
(39.28534828967223,49.07710003066973)
(124.71163478198051,-27.481172161567258)
(-147.4887686066785,76.73237945824442)
(86.63291906118863,-25.52526285188784)
(23.27751790712118,13.126101362212724)
(-70.40163237204142,-63.12562536833242)
(15.642428355535966,40.285813505163574)
(-76.53411447979884,54.5560449693637)
(8.19906334981474,67.69370966550179)
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok
ok

View File

@ -0,0 +1,61 @@
DROP TABLE IF EXISTS h3_indexes;
CREATE TABLE h3_indexes (h3_index UInt64) ENGINE = Memory;
-- Random geo coordinates were generated using the H3 tool: https://github.com/ClickHouse-Extras/h3/blob/master/src/apps/testapps/mkRandGeo.c at various resolutions from 0 to 15.
-- Corresponding H3 index values were in turn generated with those geo coordinates using `geoToH3(lon, lat, res)` ClickHouse function for the following test.
INSERT INTO h3_indexes VALUES (579205133326352383);
INSERT INTO h3_indexes VALUES (581263419093549055);
INSERT INTO h3_indexes VALUES (589753847883235327);
INSERT INTO h3_indexes VALUES (594082350283882495);
INSERT INTO h3_indexes VALUES (598372386957426687);
INSERT INTO h3_indexes VALUES (599542359671177215);
INSERT INTO h3_indexes VALUES (604296355086598143);
INSERT INTO h3_indexes VALUES (608785214872748031);
INSERT INTO h3_indexes VALUES (615732192485572607);
INSERT INTO h3_indexes VALUES (617056794467368959);
INSERT INTO h3_indexes VALUES (624586477873168383);
INSERT INTO h3_indexes VALUES (627882919484481535);
INSERT INTO h3_indexes VALUES (634600058503392255);
INSERT INTO h3_indexes VALUES (635544851677385791);
INSERT INTO h3_indexes VALUES (639763125756281263);
INSERT INTO h3_indexes VALUES (644178757620501158);
SELECT h3ToGeo(h3_index) FROM h3_indexes ORDER BY h3_index;
DROP TABLE h3_indexes;
DROP TABLE IF EXISTS h3_geo;
-- compare if the results of h3ToGeo and geoToH3 are the same
CREATE TABLE h3_geo(lat Float64, lon Float64, res UInt8) ENGINE = Memory;
INSERT INTO h3_geo VALUES (-173.6412167681162, -14.130272474941535, 0);
INSERT INTO h3_geo VALUES (59.48137613600854, 58.020407687755686, 1);
INSERT INTO h3_geo VALUES (172.68095885060296, -83.6576608516349, 2);
INSERT INTO h3_geo VALUES (-94.46556851304558, -69.1999982492279, 3);
INSERT INTO h3_geo VALUES (-8.188263637093279, -55.856179102736284, 4);
INSERT INTO h3_geo VALUES (77.25594891852249, 47.39278564360122, 5);
INSERT INTO h3_geo VALUES (135.11348004704536, 36.60778126579667, 6);
INSERT INTO h3_geo VALUES (39.28534828967223, 49.07710003066973, 7);
INSERT INTO h3_geo VALUES (124.71163478198051, -27.481172161567258, 8);
INSERT INTO h3_geo VALUES (-147.4887686066785, 76.73237945824442, 9);
INSERT INTO h3_geo VALUES (86.63291906118863, -25.52526285188784, 10);
INSERT INTO h3_geo VALUES (23.27751790712118, 13.126101362212724, 11);
INSERT INTO h3_geo VALUES (-70.40163237204142, -63.12562536833242, 12);
INSERT INTO h3_geo VALUES (15.642428355535966, 40.285813505163574, 13);
INSERT INTO h3_geo VALUES (-76.53411447979884, 54.5560449693637, 14);
INSERT INTO h3_geo VALUES (8.19906334981474, 67.69370966550179, 15);
SELECT result FROM (
SELECT
(lat, lon) AS input_geo,
h3ToGeo(geoToH3(lat, lon, res)) AS output_geo,
if(input_geo = output_geo, 'ok', 'fail') AS result
FROM h3_geo
);
DROP TABLE h3_geo;

View File

@ -0,0 +1,9 @@
-- { echo }
SELECT 1 != (NOT 1);
1
SELECT 1 != NOT 1;
1
EXPLAIN SYNTAX SELECT 1 != (NOT 1);
SELECT 1 != NOT 1
EXPLAIN SYNTAX SELECT 1 != NOT 1;
SELECT 1 != NOT 1

View File

@ -0,0 +1,5 @@
-- { echo }
SELECT 1 != (NOT 1);
SELECT 1 != NOT 1;
EXPLAIN SYNTAX SELECT 1 != (NOT 1);
EXPLAIN SYNTAX SELECT 1 != NOT 1;

View File

@ -0,0 +1 @@
a c

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS t_array_index;
CREATE TABLE t_array_index (n Nested(key String, value String))
ENGINE = MergeTree ORDER BY n.key;
INSERT INTO t_array_index VALUES (['a', 'b'], ['c', 'd']);
SELECT * FROM t_array_index ARRAY JOIN n WHERE n.key = 'a';
DROP TABLE IF EXISTS t_array_index;

View File

@ -246,3 +246,4 @@
01901_test_attach_partition_from 01901_test_attach_partition_from
01910_view_dictionary 01910_view_dictionary
01824_prefer_global_in_and_join 01824_prefer_global_in_and_join
01576_alias_column_rewrite