Merge branch 'master' into url_engine_problem

This commit is contained in:
Mikhail Korotov 2019-11-06 12:35:30 +03:00
commit cc96c511c3
59 changed files with 980 additions and 862 deletions

View File

@ -248,6 +248,47 @@ fix comments to make obvious that it may throw.
[#7350](https://github.com/ClickHouse/ClickHouse/pull/7350)
([tavplubix](https://github.com/tavplubix))
## ClickHouse release 19.15.4.10, 2019-10-31
### Bug Fix
* Added handling of SQL_TINYINT and SQL_BIGINT, and fix handling of SQL_FLOAT data source types in ODBC Bridge.
[#7491](https://github.com/ClickHouse/ClickHouse/pull/7491) ([Denis Glazachev](https://github.com/traceon))
* Allowed to have some parts on destination disk or volume in MOVE PARTITION.
[#7434](https://github.com/ClickHouse/ClickHouse/pull/7434) ([Vladimir Chebotarev](https://github.com/excitoon))
* Fixed NULL-values in nullable columns through ODBC-bridge.
[#7402](https://github.com/ClickHouse/ClickHouse/pull/7402) ([Vasily Nemkov](https://github.com/Enmk))
* Fixed INSERT into Distributed non local node with MATERIALIZED columns.
[#7377](https://github.com/ClickHouse/ClickHouse/pull/7377) ([Azat Khuzhin](https://github.com/azat))
* Fixed function getMultipleValuesFromConfig.
[#7374](https://github.com/ClickHouse/ClickHouse/pull/7374) ([Mikhail Korotov](https://github.com/millb))
* Fixed issue of using HTTP keep alive timeout instead of TCP keep alive timeout.
[#7351](https://github.com/ClickHouse/ClickHouse/pull/7351) ([Vasily Nemkov](https://github.com/Enmk))
* Wait for all jobs to finish on exception (fixes rare segfaults).
[#7350](https://github.com/ClickHouse/ClickHouse/pull/7350) ([tavplubix](https://github.com/tavplubix))
* Don't push to MVs when inserting into Kafka table.
[#7265](https://github.com/ClickHouse/ClickHouse/pull/7265) ([Ivan](https://github.com/abyss7))
* Disable memory tracker for exception stack.
[#7264](https://github.com/ClickHouse/ClickHouse/pull/7264) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
* Fixed bad code in transforming query for external database.
[#7252](https://github.com/ClickHouse/ClickHouse/pull/7252) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Avoid use of uninitialized values in MetricsTransmitter.
[#7158](https://github.com/ClickHouse/ClickHouse/pull/7158) ([Azat Khuzhin](https://github.com/azat))
* Added example config with macros for tests ([alexey-milovidov](https://github.com/alexey-milovidov))
## ClickHouse release 19.15.3.6, 2019-10-09
### Bug Fix
* Fixed bad_variant in hashed dictionary.
([alesapin](https://github.com/alesapin))
* Fixed up bug with segmentation fault in ATTACH PART query.
([alesapin](https://github.com/alesapin))
* Fixed time calculation in `MergeTreeData`.
([Vladimir Chebotarev](https://github.com/excitoon))
* Commit to Kafka explicitly after the writing is finalized.
[#7175](https://github.com/ClickHouse/ClickHouse/pull/7175) ([Ivan](https://github.com/abyss7))
* Serialize NULL values correctly in min/max indexes of MergeTree parts.
[#7234](https://github.com/ClickHouse/ClickHouse/pull/7234) ([Alexander Kuzmenkov](https://github.com/akuzm))
## ClickHouse release 19.15.2.2, 2019-10-01
### New Feature

View File

@ -193,7 +193,7 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
if (has_space_ratio)
{
auto ratio = config.getDouble(config_prefix + ".keep_free_space_ratio");
auto ratio = config.getDouble(disk_config_prefix + ".keep_free_space_ratio");
if (ratio < 0 || ratio > 1)
throw Exception("'keep_free_space_ratio' have to be between 0 and 1",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);

View File

@ -1,6 +1,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -66,7 +67,9 @@ String Macros::expand(const String & s, size_t level, const String & database_na
else if (macro_name == "table" && !table_name.empty())
res += table_name;
else
throw Exception("No macro " + macro_name + " in config", ErrorCodes::SYNTAX_ERROR);
throw Exception("No macro '" + macro_name +
"' in config while processing substitutions in '" + s + "' at "
+ toString(begin), ErrorCodes::SYNTAX_ERROR);
pos = end + 1;
}

View File

@ -1,96 +1,149 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include "FunctionArrayMapped.h"
#include <Functions/array/FunctionArrayMapped.h>
#include <Functions/FunctionFactory.h>
namespace DB
{
/// arrayCompact(['a', 'a', 'b', 'b', 'a']) = ['a', 'b', 'a'] - compact arrays
namespace ErrorCodes
/// arrayCompact(['a', 'a', 'b', 'b', 'a']) = ['a', 'b', 'a'] - compact arrays
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
struct ArrayCompactImpl
{
static bool useDefaultImplementationForConstants() { return true; }
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }
static DataTypePtr getReturnType(const DataTypePtr & nested_type, const DataTypePtr &)
{
extern const int ILLEGAL_COLUMN;
return std::make_shared<DataTypeArray>(nested_type);
}
struct ArrayCompactImpl
template <typename T>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{
static bool useDefaultImplementationForConstants() { return true; }
static bool needBoolean() { return false; }
static bool needExpression() { return false; }
static bool needOneArray() { return false; }
const ColumnVector<T> * src_values_column = checkAndGetColumn<ColumnVector<T>>(mapped.get());
static DataTypePtr getReturnType(const DataTypePtr & nested_type, const DataTypePtr & /*nested_type*/)
if (!src_values_column)
return false;
const IColumn::Offsets & src_offsets = array.getOffsets();
const typename ColumnVector<T>::Container & src_values = src_values_column->getData();
auto res_values_column = ColumnVector<T>::create(src_values.size());
typename ColumnVector<T>::Container & res_values = res_values_column->getData();
size_t src_offsets_size = src_offsets.size();
auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
IColumn::Offsets & res_offsets = res_offsets_column->getData();
size_t res_pos = 0;
size_t src_pos = 0;
for (size_t i = 0; i < src_offsets_size; ++i)
{
return std::make_shared<DataTypeArray>(nested_type);
}
auto src_offset = src_offsets[i];
template <typename T>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{
const ColumnVector<T> * column = checkAndGetColumn<ColumnVector<T>>(&*mapped);
if (!column)
return false;
const IColumn::Offsets & offsets = array.getOffsets();
const typename ColumnVector<T>::Container & data = column->getData();
auto column_data = ColumnVector<T>::create(data.size());
typename ColumnVector<T>::Container & res_values = column_data->getData();
auto column_offsets = ColumnArray::ColumnOffsets::create(offsets.size());
IColumn::Offsets & res_offsets = column_offsets->getData();
size_t res_pos = 0;
size_t pos = 0;
for (size_t i = 0; i < offsets.size(); ++i)
/// If array is not empty.
if (src_pos < src_offset)
{
if (pos < offsets[i])
/// Insert first element unconditionally.
res_values[res_pos] = src_values[src_pos];
/// For the rest of elements, insert if the element is different from the previous.
++src_pos;
++res_pos;
for (; src_pos < src_offset; ++src_pos)
{
res_values[res_pos] = data[pos];
for (++pos, ++res_pos; pos < offsets[i]; ++pos)
if (src_values[src_pos] != src_values[src_pos - 1])
{
if (data[pos] != data[pos - 1])
{
res_values[res_pos++] = data[pos];
}
res_values[res_pos] = src_values[src_pos];
++res_pos;
}
}
res_offsets[i] = res_pos;
}
for (size_t i = 0; i < data.size() - res_pos; ++i)
{
res_values.pop_back();
}
res_ptr = ColumnArray::create(std::move(column_data), std::move(column_offsets));
return true;
res_offsets[i] = res_pos;
}
res_values.resize(res_pos);
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
ColumnPtr res;
if (executeType< UInt8 >(mapped, array, res) ||
executeType< UInt16>(mapped, array, res) ||
executeType< UInt32>(mapped, array, res) ||
executeType< UInt64>(mapped, array, res) ||
executeType< Int8 >(mapped, array, res) ||
executeType< Int16 >(mapped, array, res) ||
executeType< Int32 >(mapped, array, res) ||
executeType< Int64 >(mapped, array, res) ||
executeType<Float32>(mapped, array, res) ||
executeType<Float64>(mapped, array, res))
return res;
else
throw Exception("Unexpected column for arrayCompact: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);
}
};
struct NameArrayCompact { static constexpr auto name = "arrayCompact"; };
using FunctionArrayCompact = FunctionArrayMapped<ArrayCompactImpl, NameArrayCompact>;
void registerFunctionArrayCompact(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayCompact>();
res_ptr = ColumnArray::create(std::move(res_values_column), std::move(res_offsets_column));
return true;
}
static void executeGeneric(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{
const IColumn::Offsets & src_offsets = array.getOffsets();
auto res_values_column = mapped->cloneEmpty();
res_values_column->reserve(mapped->size());
size_t src_offsets_size = src_offsets.size();
auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
IColumn::Offsets & res_offsets = res_offsets_column->getData();
size_t res_pos = 0;
size_t src_pos = 0;
for (size_t i = 0; i < src_offsets_size; ++i)
{
auto src_offset = src_offsets[i];
/// If array is not empty.
if (src_pos < src_offset)
{
/// Insert first element unconditionally.
res_values_column->insertFrom(*mapped, src_pos);
/// For the rest of elements, insert if the element is different from the previous.
++src_pos;
++res_pos;
for (; src_pos < src_offset; ++src_pos)
{
if (mapped->compareAt(src_pos - 1, src_pos, *mapped, 1))
{
res_values_column->insertFrom(*mapped, src_pos);
++res_pos;
}
}
}
res_offsets[i] = res_pos;
}
res_ptr = ColumnArray::create(std::move(res_values_column), std::move(res_offsets_column));
}
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
ColumnPtr res;
if (!(executeType< UInt8 >(mapped, array, res) ||
executeType< UInt16>(mapped, array, res) ||
executeType< UInt32>(mapped, array, res) ||
executeType< UInt64>(mapped, array, res) ||
executeType< Int8 >(mapped, array, res) ||
executeType< Int16 >(mapped, array, res) ||
executeType< Int32 >(mapped, array, res) ||
executeType< Int64 >(mapped, array, res) ||
executeType<Float32>(mapped, array, res) ||
executeType<Float64>(mapped, array, res)))
{
executeGeneric(mapped, array, res);
}
return res;
}
};
struct NameArrayCompact { static constexpr auto name = "arrayCompact"; };
using FunctionArrayCompact = FunctionArrayMapped<ArrayCompactImpl, NameArrayCompact>;
void registerFunctionArrayCompact(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayCompact>();
}
}

View File

@ -214,7 +214,8 @@ protected:
+ ", should be at least 2.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return getLeastSupertype(arguments);
/// We always return Strings from concat, even if arguments were fixed strings.
return std::make_shared<DataTypeString>();
}
private:

View File

@ -15,6 +15,7 @@
#include <DataStreams/DistinctBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/TotalsHavingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
@ -26,7 +27,6 @@
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -1179,7 +1179,10 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
{
if (auto stream = join->createStreamWithNonJoinedRows(header_before_join, settings.max_block_size))
Block join_result_sample = ExpressionBlockInputStream(
std::make_shared<OneBlockInputStream>(header_before_join), expressions.before_join).getHeader();
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
{
if constexpr (pipeline_with_processors)
{

View File

@ -1066,9 +1066,10 @@ struct AdderNonJoined<ASTTableJoin::Strictness::Asof, Mapped>
class NonJoinedBlockInputStream : public IBlockInputStream
{
public:
NonJoinedBlockInputStream(const Join & parent_, const Block & left_sample_block, UInt64 max_block_size_)
NonJoinedBlockInputStream(const Join & parent_, const Block & result_sample_block_, UInt64 max_block_size_)
: parent(parent_)
, max_block_size(max_block_size_)
, result_sample_block(materializeBlock(result_sample_block_))
{
bool remap_keys = parent.table_join->hasUsing();
std::unordered_map<size_t, size_t> left_to_right_key_remap;
@ -1078,16 +1079,18 @@ public:
const String & left_key_name = parent.table_join->keyNamesLeft()[i];
const String & right_key_name = parent.table_join->keyNamesRight()[i];
size_t left_key_pos = left_sample_block.getPositionByName(left_key_name);
size_t left_key_pos = result_sample_block.getPositionByName(left_key_name);
size_t right_key_pos = parent.saved_block_sample.getPositionByName(right_key_name);
if (remap_keys && !parent.required_right_keys.has(right_key_name))
left_to_right_key_remap[left_key_pos] = right_key_pos;
}
makeResultSampleBlock(left_sample_block);
/// result_sample_block: left_sample_block + left expressions, right not key columns, required right keys
size_t left_columns_count = result_sample_block.columns() -
parent.sample_block_with_columns_to_add.columns() - parent.required_right_keys.columns();
for (size_t left_pos = 0; left_pos < left_sample_block.columns(); ++left_pos)
for (size_t left_pos = 0; left_pos < left_columns_count; ++left_pos)
{
/// We need right 'x' for 'RIGHT JOIN ... USING(x)'.
if (left_to_right_key_remap.count(left_pos))
@ -1108,7 +1111,7 @@ public:
size_t result_position = result_sample_block.getPositionByName(name);
/// Don't remap left keys twice. We need only qualified right keys here
if (result_position < left_sample_block.columns())
if (result_position < left_columns_count)
continue;
setRightIndex(right_pos, result_position);
@ -1140,7 +1143,7 @@ private:
UInt64 max_block_size;
Block result_sample_block;
/// Indices of columns in result_sample_block that come from the left-side table: left_pos == result_pos
/// Indices of columns in result_sample_block that should be generated
std::vector<size_t> column_indices_left;
/// Indices of columns that come from the right-side table: right_pos -> result_pos
std::unordered_map<size_t, size_t> column_indices_right;
@ -1152,27 +1155,6 @@ private:
std::any position;
std::optional<Join::BlockNullmapList::const_iterator> nulls_position;
/// "left" columns, "right" not key columns, some "right keys"
void makeResultSampleBlock(const Block & left_sample_block)
{
result_sample_block = materializeBlock(left_sample_block);
if (parent.nullable_left_side)
JoinCommon::convertColumnsToNullable(result_sample_block);
for (const ColumnWithTypeAndName & column : parent.sample_block_with_columns_to_add)
{
bool is_nullable = parent.nullable_right_side || column.column->isNullable();
result_sample_block.insert(correctNullability({column.column, column.type, column.name}, is_nullable));
}
for (const ColumnWithTypeAndName & right_key : parent.required_right_keys)
{
bool is_nullable = parent.nullable_right_side || right_key.column->isNullable();
result_sample_block.insert(correctNullability({right_key.column, right_key.type, right_key.name}, is_nullable));
}
}
void setRightIndex(size_t right_pos, size_t result_position)
{
if (!column_indices_right.count(right_pos))
@ -1328,10 +1310,10 @@ private:
};
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const
BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const
{
if (isRightOrFull(table_join->kind()))
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, max_block_size);
return std::make_shared<NonJoinedBlockInputStream>(*this, result_sample_block, max_block_size);
return {};
}

View File

@ -156,7 +156,7 @@ public:
* Use only after all calls to joinBlock was done.
* left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
*/
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, UInt64 max_block_size) const override;
BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & result_sample_block, UInt64 max_block_size) const override;
/// Number of keys in all built JOIN maps.
size_t getTotalRowCount() const final;

View File

@ -6,6 +6,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/ClickHouseRevision.h>
#include <Common/SipHash.h>
#include <Common/quoteString.h>
#include <Interpreters/Context.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <IO/ReadBufferFromFile.h>
@ -355,8 +356,19 @@ struct StorageDistributedDirectoryMonitor::Batch
/// we must try to re-send exactly the same batches.
/// So we save contents of the current batch into the current_batch_file_path file
/// and truncate it afterwards if all went well.
WriteBufferFromFile out{parent.current_batch_file_path};
writeText(out);
/// Temporary file is required for atomicity.
String tmp_file{parent.current_batch_file_path + ".tmp"};
if (Poco::File{tmp_file}.exists())
LOG_ERROR(parent.log, "Temporary file " << backQuote(tmp_file) << " exists. Unclean shutdown?");
{
WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
writeText(out);
}
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto connection = parent.pool->get(timeouts);

View File

@ -546,7 +546,15 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
if (data.hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->getMarksCount()}};
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
}
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(

View File

@ -227,7 +227,7 @@ void registerStorageHDFS(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
if (engine_args.size() != 2)
throw Exception(
"Storage HDFS requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -183,7 +183,7 @@ void registerStorageS3(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
if (engine_args.size() != 2)
throw Exception(
"Storage S3 requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -215,7 +215,7 @@ void registerStorageURL(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
if (engine_args.size() != 2)
throw Exception(
"Storage URL requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

View File

@ -1,4 +1,4 @@
[pytest]
python_files = test.py
python_files = test*.py
norecursedirs = _instances
timeout = 600

View File

@ -0,0 +1,109 @@
import os
import os.path as p
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
class ClickHouseClusterWithDDLHelpers(ClickHouseCluster):
def __init__(self, base_path, config_dir):
ClickHouseCluster.__init__(self, base_path)
self.test_config_dir = config_dir
def prepare(self, replace_hostnames_with_ips=True):
try:
for i in xrange(4):
self.add_instance(
'ch{}'.format(i+1),
config_dir=self.test_config_dir,
macros={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
with_zookeeper=True)
self.start()
# Replace config files for testing ability to set host in DNS and IP formats
if replace_hostnames_with_ips:
self.replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = self.instances['ch4']
self.pm_random_drops = PartitionManager()
self.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
self.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
# Initialize databases and service tables
instance = self.instances['ch1']
self.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
(database String, name String, engine String, metadata_modification_time DateTime)
ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
""")
self.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
except Exception as e:
print e
raise
def sync_replicas(self, table, timeout=5):
for instance in self.instances.values():
instance.query("SYSTEM SYNC REPLICA {}".format(table), timeout=timeout)
def check_all_hosts_successfully_executed(self, tsv_content, num_hosts=None):
if num_hosts is None:
num_hosts = len(self.instances)
M = TSV.toMat(tsv_content)
hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(self, instance, query, num_hosts=None):
contents = instance.query(query)
self.check_all_hosts_successfully_executed(contents, num_hosts)
return contents
def replace_domains_to_ip_addresses_in_cluster_config(self, instances_to_replace):
clusters_config = open(p.join(self.base_dir, '{}/config.d/clusters.xml'.format(self.test_config_dir))).read()
for inst_name, inst in self.instances.items():
clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))
for inst_name in instances_to_replace:
inst = self.instances[inst_name]
self.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
@staticmethod
def ddl_check_there_are_no_dublicates(instance):
query = "SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)"
rows = instance.query(query)
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address, query)
@staticmethod
def insert_reliable(instance, query_insert):
"""
Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
"""
for i in xrange(100):
try:
instance.query(query_insert)
return
except Exception as e:
last_exception = e
s = str(e)
if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
raise e
raise last_exception

View File

@ -1,122 +1,34 @@
import os
import os.path as p
import sys
import time
import datetime
import pytest
from contextlib import contextmanager
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager, PartitionManagerDisbaler
from helpers.network import PartitionManager
from helpers.test_tools import TSV
from .cluster import ClickHouseClusterWithDDLHelpers
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
if num_hosts is None:
num_hosts = len(cluster.instances)
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
M = TSV.toMat(tsv_content)
hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(instance, query, num_hosts=None):
contents = instance.query(query)
check_all_hosts_sucesfully_executed(contents, num_hosts)
return contents
def ddl_check_there_are_no_dublicates(instance):
rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)")
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
for i in xrange(100):
try:
instance.query(query_insert)
return
except Exception as e:
last_exception = e
s = str(e)
if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
raise e
raise last_exception
TEST_REPLICATED_ALTERS=False # TODO: Check code and turn on
cluster = ClickHouseCluster(__file__)
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()
for inst_name, inst in cluster.instances.items():
clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))
for inst_name in instances_to_replace:
inst = cluster.instances[inst_name]
cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
def init_cluster(cluster):
try:
for i in xrange(4):
cluster.add_instance(
'ch{}'.format(i+1),
config_dir="configs",
macros={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
with_zookeeper=True)
cluster.start()
# Replace config files for testing ability to set host in DNS and IP formats
replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = cluster.instances['ch4']
cluster.pm_random_drops = PartitionManager()
cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
# Initialize databases and service tables
instance = cluster.instances['ch1']
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
(database String, name String, engine String, metadata_modification_time DateTime)
ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
""")
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
except Exception as e:
print e
raise
@pytest.fixture(scope="module")
def started_cluster():
try:
init_cluster(cluster)
cluster.prepare()
yield cluster
instance = cluster.instances['ch1']
ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in cluster.instances.values():
ddl_check_there_are_no_dublicates(instance)
cluster.ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
@ -124,57 +36,57 @@ def started_cluster():
cluster.shutdown()
def test_default_database(started_cluster):
instance = cluster.instances['ch3']
def test_default_database(test_cluster):
instance = test_cluster.instances['ch3']
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")
test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
test_cluster.ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")
contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
def test_create_view(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
def test_create_view(test_cluster):
instance = test_cluster.instances['ch3']
test_cluster.ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
test_cluster.ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
test_cluster.ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
test_cluster.ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
test_cluster.ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
def test_on_server_fail(started_cluster):
instance = cluster.instances['ch1']
kill_instance = cluster.instances['ch2']
def test_on_server_fail(test_cluster):
instance = test_cluster.instances['ch1']
kill_instance = test_cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
kill_instance.get_docker_handle().stop()
request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", timeout=30)
kill_instance.get_docker_handle().start()
ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
# Check query itself
check_all_hosts_sucesfully_executed(request.get_answer())
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
# And check query artefacts
contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
test_cluster.ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(cluster, zk_timeout):
instance = cluster.instances['ch1']
kill_instance = cluster.instances['ch2']
def _test_on_connection_losses(test_cluster, zk_timeout):
instance = test_cluster.instances['ch1']
kill_instance = test_cluster.instances['ch2']
with PartitionManager() as pm:
pm.drop_instance_zk_connections(kill_instance)
@ -182,170 +94,111 @@ def _test_on_connection_losses(cluster, zk_timeout):
time.sleep(zk_timeout)
pm.restore_instance_zk_connections(kill_instance)
check_all_hosts_sucesfully_executed(request.get_answer())
test_cluster.check_all_hosts_successfully_executed(request.get_answer())
def test_on_connection_loss(started_cluster):
_test_on_connection_losses(cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)
def test_on_connection_loss(test_cluster):
_test_on_connection_losses(test_cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)
def test_on_session_expired(started_cluster):
_test_on_connection_losses(cluster, 4) # session should be expired (3 sec ZK timeout in config)
def test_on_session_expired(test_cluster):
_test_on_connection_losses(test_cluster, 4) # session should be expired (3 sec ZK timeout in config)
def test_replicated_alters(started_cluster):
instance = cluster.instances['ch2']
def test_simple_alters(test_cluster):
instance = test_cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")
if not TEST_REPLICATED_ALTERS:
return
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge_for_alter ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
for i in xrange(4):
k = (i / 2) * 2
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (i) VALUES ({})({})".format(k, k+1))
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster MODIFY COLUMN i Int64")
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
for i in xrange(4):
k = (i / 2) * 2 + 4
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (p, i) VALUES (31, {})(31, {})".format(k, k+1))
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster DETACH PARTITION 197002")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
def test_simple_alters(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, """
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
ddl_check_query(instance, """
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
ddl_check_query(instance, """
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster_without_replication (p Date, i Int64, s String)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
for i in xrange(4):
k = (i / 2) * 2
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
test_cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
for i in xrange(4):
k = (i / 2) * 2 + 4
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
test_cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
test_cluster.ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")
test_cluster.ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")
def test_macro(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "CREATE TABLE tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory")
def test_macro(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "CREATE TABLE tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory")
for i in xrange(4):
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO tab VALUES ({})".format(i))
test_cluster.insert_reliable(test_cluster.instances['ch{}'.format(i + 1)], "INSERT INTO tab VALUES ({})".format(i))
ddl_check_query(instance, "CREATE TABLE distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)")
test_cluster.ddl_check_query(instance, "CREATE TABLE distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)")
assert TSV(instance.query("SELECT value FROM distr ORDER BY value")) == TSV('0\n1\n2\n3\n')
assert TSV( cluster.instances['ch3'].query("SELECT value FROM distr ORDER BY value")) == TSV('0\n1\n2\n3\n')
assert TSV(test_cluster.instances['ch3'].query("SELECT value FROM distr ORDER BY value")) == TSV('0\n1\n2\n3\n')
ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'")
ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'")
def test_implicit_macros(started_cluster):
def test_implicit_macros(test_cluster):
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
instance = cluster.instances['ch2']
instance = test_cluster.instances['ch2']
ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'")
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
ddl_check_query(instance, """
test_cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS test_db.test_macro ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/{layer}-{shard}/{table}', '{replica}', p, p, 1)
""")
# Check that table was created at correct path in zookeeper
assert cluster.get_kazoo_client('zoo1').exists('/clickhouse/tables/test_db/0-1/test_macro') is not None
assert test_cluster.get_kazoo_client('zoo1').exists('/clickhouse/tables/test_db/0-1/test_macro') is not None
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
def test_allowed_databases(started_cluster):
instance = cluster.instances['ch2']
def test_allowed_databases(test_cluster):
instance = test_cluster.instances['ch2']
instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster")
instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster")
instance.query("CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
@ -355,45 +208,45 @@ def test_allowed_databases(started_cluster):
instance.query("DROP DATABASE db1 ON CLUSTER cluster", settings={"user" : "restricted_user"})
def test_kill_query(started_cluster):
instance = cluster.instances['ch3']
def test_kill_query(test_cluster):
instance = test_cluster.instances['ch3']
ddl_check_query(instance, "KILL QUERY ON CLUSTER 'cluster' WHERE NOT elapsed FORMAT TSV")
test_cluster.ddl_check_query(instance, "KILL QUERY ON CLUSTER 'cluster' WHERE NOT elapsed FORMAT TSV")
def test_detach_query(started_cluster):
instance = cluster.instances['ch3']
def test_detach_query(test_cluster):
instance = test_cluster.instances['ch3']
ddl_check_query(instance, "DROP TABLE IF EXISTS test_attach ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test_attach ON CLUSTER cluster (i Int8)ENGINE = Log")
ddl_check_query(instance, "DETACH TABLE test_attach ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "ATTACH TABLE test_attach ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_attach ON CLUSTER cluster FORMAT TSV")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_attach ON CLUSTER cluster (i Int8)ENGINE = Log")
test_cluster.ddl_check_query(instance, "DETACH TABLE test_attach ON CLUSTER cluster FORMAT TSV")
test_cluster.ddl_check_query(instance, "ATTACH TABLE test_attach ON CLUSTER cluster")
def test_optimize_query(started_cluster):
instance = cluster.instances['ch3']
def test_optimize_query(test_cluster):
instance = test_cluster.instances['ch3']
ddl_check_query(instance, "DROP TABLE IF EXISTS test_optimize ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)")
ddl_check_query(instance, "OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_optimize ON CLUSTER cluster FORMAT TSV")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)")
test_cluster.ddl_check_query(instance, "OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV")
def test_create_as_select(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "CREATE TABLE test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)")
def test_create_as_select(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "CREATE TABLE test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)")
assert TSV(instance.query("SELECT x FROM test_as_select ORDER BY x")) == TSV("1\n2\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS test_as_select ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_as_select ON CLUSTER cluster")
def test_create_reserved(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "CREATE TABLE test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192)")
ddl_check_query(instance, "CREATE TABLE test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved)")
ddl_check_query(instance, "DROP TABLE IF EXISTS test_reserved ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE IF EXISTS test_as_reserved ON CLUSTER cluster")
def test_create_reserved(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "CREATE TABLE test_reserved ON CLUSTER cluster (`p` Date, `image` Nullable(String), `index` Nullable(Float64), `invalidate` Nullable(Int64)) ENGINE = MergeTree(`p`, `p`, 8192)")
test_cluster.ddl_check_query(instance, "CREATE TABLE test_as_reserved ON CLUSTER cluster ENGINE = Memory AS (SELECT * from test_reserved)")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_reserved ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS test_as_reserved ON CLUSTER cluster")
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
with contextmanager(test_cluster)() as ctx_cluster:
for name, instance in ctx_cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,93 @@
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.test_tools import TSV
from .cluster import ClickHouseClusterWithDDLHelpers
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
try:
# TODO: Fix ON CLUSTER alters when nodes have different configs. Need to canonicalize node identity.
cluster.prepare(replace_hostnames_with_ips=False)
yield cluster
instance = cluster.instances['ch1']
cluster.ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
cluster.ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in cluster.instances.values():
cluster.ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
finally:
cluster.shutdown()
def test_replicated_alters(test_cluster):
instance = test_cluster.instances['ch2']
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = test_cluster.pm_random_drops.pop_rules()
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge_for_alter ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
test_cluster.ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
for i in xrange(4):
k = (i / 2) * 2
test_cluster.insert_reliable(test_cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (i) VALUES ({})({})".format(k, k+1))
test_cluster.sync_replicas("merge_for_alter")
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster MODIFY COLUMN i Int64")
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
for i in xrange(4):
k = (i / 2) * 2 + 4
test_cluster.insert_reliable(test_cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (p, i) VALUES (31, {})(31, {})".format(k, k+1))
test_cluster.sync_replicas("merge_for_alter")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
test_cluster.ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster DETACH PARTITION 197002")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
test_cluster.ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
# Enable random ZK packet drops
test_cluster.pm_random_drops.push_rules(firewall_drops_rules)
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
test_cluster.ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")

View File

@ -1,391 +0,0 @@
import os
import os.path as p
import sys
import time
import datetime
import pytest
from contextlib import contextmanager
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager, PartitionManagerDisbaler
from helpers.test_tools import TSV
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts=None):
if num_hosts is None:
num_hosts = len(cluster.instances)
M = TSV.toMat(tsv_content)
hosts = [(l[0], l[1]) for l in M] # (host, port)
codes = [l[2] for l in M]
messages = [l[3] for l in M]
assert len(hosts) == num_hosts and len(set(hosts)) == num_hosts, "\n" + tsv_content
assert len(set(codes)) == 1, "\n" + tsv_content
assert codes[0] == "0", "\n" + tsv_content
def ddl_check_query(instance, query, num_hosts=None):
contents = instance.query(query)
check_all_hosts_sucesfully_executed(contents, num_hosts)
return contents
def ddl_check_there_are_no_dublicates(instance):
rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)")
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
for i in xrange(100):
try:
instance.query(query_insert)
return
except Exception as e:
last_exception = e
s = str(e)
if not (s.find('Unknown status, client must retry') >= 0 or s.find('zkutil::KeeperException')):
raise e
raise last_exception
TEST_REPLICATED_ALTERS=False # TODO: Check code and turn on
cluster = ClickHouseCluster(__file__)
def replace_domains_to_ip_addresses_in_cluster_config(instances_to_replace):
clusters_config = open(p.join(cluster.base_dir, 'configs/config.d/clusters.xml')).read()
for inst_name, inst in cluster.instances.items():
clusters_config = clusters_config.replace(inst_name, str(inst.ip_address))
for inst_name in instances_to_replace:
inst = cluster.instances[inst_name]
cluster.instances[inst_name].exec_in_container(['bash', '-c', 'echo "$NEW_CONFIG" > /etc/clickhouse-server/config.d/clusters.xml'], environment={"NEW_CONFIG": clusters_config}, privileged=True)
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
def init_cluster(cluster):
try:
for i in xrange(4):
cluster.add_instance(
'ch{}'.format(i+1),
config_dir="configs",
macros={"layer": 0, "shard": i/2 + 1, "replica": i%2 + 1},
with_zookeeper=True)
cluster.start()
# Replace config files for testing ability to set host in DNS and IP formats
replace_domains_to_ip_addresses_in_cluster_config(['ch1', 'ch3'])
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = cluster.instances['ch4']
cluster.pm_random_drops = PartitionManager()
cluster.pm_random_drops._add_rule({'probability': 0.01, 'destination': sacrifice.ip_address, 'source_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
cluster.pm_random_drops._add_rule({'probability': 0.01, 'source': sacrifice.ip_address, 'destination_port': 2181, 'action': 'REJECT --reject-with tcp-reset'})
# Initialize databases and service tables
instance = cluster.instances['ch1']
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER 'cluster_no_replicas'
(database String, name String, engine String, metadata_modification_time DateTime)
ENGINE = Distributed('cluster_no_replicas', 'system', 'tables')
""")
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test ON CLUSTER 'cluster'")
except Exception as e:
print e
raise
@pytest.fixture(scope="module")
def started_cluster():
try:
init_cluster(cluster)
yield cluster
instance = cluster.instances['ch1']
ddl_check_query(instance, "DROP DATABASE test ON CLUSTER 'cluster'")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
# Check query log to ensure that DDL queries are not executed twice
time.sleep(1.5)
for instance in cluster.instances.values():
ddl_check_there_are_no_dublicates(instance)
cluster.pm_random_drops.heal_all()
finally:
cluster.shutdown()
def test_default_database(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE null ON CLUSTER 'cluster2' (s String DEFAULT 'escape\t\nme') ENGINE = Null")
contents = instance.query("SELECT hostName() AS h, database FROM all_tables WHERE name = 'null' ORDER BY h")
assert TSV(contents) == TSV("ch1\tdefault\nch2\ttest2\nch3\tdefault\nch4\ttest2\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS null ON CLUSTER cluster2")
ddl_check_query(instance, "DROP DATABASE IF EXISTS test2 ON CLUSTER 'cluster'")
def test_create_view(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "CREATE VIEW test.super_simple_view ON CLUSTER 'cluster' AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "CREATE MATERIALIZED VIEW test.simple_mat_view ON CLUSTER 'cluster' ENGINE = Memory AS SELECT * FROM system.numbers FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.simple_mat_view ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE IF EXISTS test.super_simple_view2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test.super_simple ON CLUSTER 'cluster' (i Int8) ENGINE = Memory")
ddl_check_query(instance, "RENAME TABLE test.super_simple TO test.super_simple2 ON CLUSTER 'cluster' FORMAT TSV")
ddl_check_query(instance, "DROP TABLE test.super_simple2 ON CLUSTER 'cluster'")
def test_on_server_fail(started_cluster):
instance = cluster.instances['ch1']
kill_instance = cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS test.test_server_fail ON CLUSTER 'cluster'")
kill_instance.get_docker_handle().stop()
request = instance.get_query_request("CREATE TABLE test.test_server_fail ON CLUSTER 'cluster' (i Int8) ENGINE=Null", timeout=30)
kill_instance.get_docker_handle().start()
ddl_check_query(instance, "DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'")
# Check query itself
check_all_hosts_sucesfully_executed(request.get_answer())
# And check query artefacts
contents = instance.query("SELECT hostName() AS h FROM all_tables WHERE database='test' AND name='test_server_fail' ORDER BY h")
assert TSV(contents) == TSV("ch1\nch2\nch3\nch4\n")
ddl_check_query(instance, "DROP TABLE test.test_server_fail ON CLUSTER 'cluster'")
def _test_on_connection_losses(cluster, zk_timeout):
instance = cluster.instances['ch1']
kill_instance = cluster.instances['ch2']
with PartitionManager() as pm:
pm.drop_instance_zk_connections(kill_instance)
request = instance.get_query_request("DROP TABLE IF EXISTS test.__nope__ ON CLUSTER 'cluster'", timeout=10)
time.sleep(zk_timeout)
pm.restore_instance_zk_connections(kill_instance)
check_all_hosts_sucesfully_executed(request.get_answer())
def test_on_connection_loss(started_cluster):
_test_on_connection_losses(cluster, 1.5) # connection loss will occur only (3 sec ZK timeout in config)
def test_on_session_expired(started_cluster):
_test_on_connection_losses(cluster, 4) # session should be expired (3 sec ZK timeout in config)
def test_replicated_alters(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS merge_for_alter ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster")
if not TEST_REPLICATED_ALTERS:
return
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge_for_alter ON CLUSTER cluster (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/hits', '{replica}', p, p, 1)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster (p Date, i Int32)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster (p Date, i Int64, s String)
ENGINE = Distributed(cluster, default, merge_for_alter, i)
""")
for i in xrange(4):
k = (i / 2) * 2
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (i) VALUES ({})({})".format(k, k+1))
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster MODIFY COLUMN i Int64")
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster ADD COLUMN s DEFAULT toString(i)")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
for i in xrange(4):
k = (i / 2) * 2 + 4
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO merge_for_alter (p, i) VALUES (31, {})(31, {})".format(k, k+1))
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
ddl_check_query(instance, "ALTER TABLE merge_for_alter ON CLUSTER cluster DETACH PARTITION 197002")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge_for_alter ON CLUSTER cluster")
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster")
def test_simple_alters(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "DROP TABLE IF EXISTS merge ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_32 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE IF EXISTS all_merge_64 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS merge ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = MergeTree(p, p, 1)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_32 ON CLUSTER cluster_without_replication (p Date, i Int32)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS all_merge_64 ON CLUSTER cluster_without_replication (p Date, i Int64, s String)
ENGINE = Distributed(cluster_without_replication, default, merge, i)
""")
for i in xrange(4):
k = (i / 2) * 2
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (i) VALUES ({})({})".format(k, k+1))
assert TSV(instance.query("SELECT i FROM all_merge_32 ORDER BY i")) == TSV(''.join(['{}\n'.format(x) for x in xrange(4)]))
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication MODIFY COLUMN i Int64")
time.sleep(5)
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication ADD COLUMN s DEFAULT toString(i) FORMAT TSV")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
for i in xrange(4):
k = (i / 2) * 2 + 4
cluster.instances['ch{}'.format(i + 1)].query("INSERT INTO merge (p, i) VALUES (31, {})(31, {})".format(k, k+1))
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(8)]))
ddl_check_query(instance, "ALTER TABLE merge ON CLUSTER cluster_without_replication DETACH PARTITION 197002")
assert TSV(instance.query("SELECT i, s FROM all_merge_64 ORDER BY i")) == TSV(''.join(['{}\t{}\n'.format(x,x) for x in xrange(4)]))
ddl_check_query(instance, "DROP TABLE merge ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE all_merge_32 ON CLUSTER cluster_without_replication")
ddl_check_query(instance, "DROP TABLE all_merge_64 ON CLUSTER cluster_without_replication")
def test_macro(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "CREATE TABLE tab ON CLUSTER '{cluster}' (value UInt8) ENGINE = Memory")
for i in xrange(4):
insert_reliable(cluster.instances['ch{}'.format(i + 1)], "INSERT INTO tab VALUES ({})".format(i))
ddl_check_query(instance, "CREATE TABLE distr ON CLUSTER '{cluster}' (value UInt8) ENGINE = Distributed('{cluster}', 'default', 'tab', value % 4)")
assert TSV(instance.query("SELECT value FROM distr ORDER BY value")) == TSV('0\n1\n2\n3\n')
assert TSV( cluster.instances['ch3'].query("SELECT value FROM distr ORDER BY value")) == TSV('0\n1\n2\n3\n')
ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'")
ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'")
def test_implicit_macros(started_cluster):
# Temporarily disable random ZK packet drops, they might broke creation if ReplicatedMergeTree replicas
firewall_drops_rules = cluster.pm_random_drops.pop_rules()
instance = cluster.instances['ch2']
ddl_check_query(instance, "DROP DATABASE IF EXISTS test_db ON CLUSTER '{cluster}'")
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS test_db ON CLUSTER '{cluster}'")
ddl_check_query(instance, """
CREATE TABLE IF NOT EXISTS test_db.test_macro ON CLUSTER '{cluster}' (p Date, i Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/{layer}-{shard}/{table}', '{replica}', p, p, 1)
""")
# Check that table was created at correct path in zookeeper
assert cluster.get_kazoo_client('zoo1').exists('/clickhouse/tables/test_db/0-1/test_macro') is not None
# Enable random ZK packet drops
cluster.pm_random_drops.push_rules(firewall_drops_rules)
def test_allowed_databases(started_cluster):
instance = cluster.instances['ch2']
instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster")
instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster")
instance.query("CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("DROP DATABASE db2 ON CLUSTER cluster", settings={"user" : "restricted_user"})
instance.query("DROP DATABASE db1 ON CLUSTER cluster", settings={"user" : "restricted_user"})
def test_kill_query(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "KILL QUERY ON CLUSTER 'cluster' WHERE NOT elapsed FORMAT TSV")
def test_detach_query(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "DROP TABLE IF EXISTS test_attach ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test_attach ON CLUSTER cluster (i Int8)ENGINE = Log")
ddl_check_query(instance, "DETACH TABLE test_attach ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "ATTACH TABLE test_attach ON CLUSTER cluster")
def test_optimize_query(started_cluster):
instance = cluster.instances['ch3']
ddl_check_query(instance, "DROP TABLE IF EXISTS test_optimize ON CLUSTER cluster FORMAT TSV")
ddl_check_query(instance, "CREATE TABLE test_optimize ON CLUSTER cluster (p Date, i Int32) ENGINE = MergeTree(p, p, 8192)")
ddl_check_query(instance, "OPTIMIZE TABLE test_optimize ON CLUSTER cluster FORMAT TSV")
def test_create_as_select(started_cluster):
instance = cluster.instances['ch2']
ddl_check_query(instance, "CREATE TABLE test_as_select ON CLUSTER cluster ENGINE = Memory AS (SELECT 1 AS x UNION ALL SELECT 2 AS x)")
assert TSV(instance.query("SELECT x FROM test_as_select ORDER BY x")) == TSV("1\n2\n")
ddl_check_query(instance, "DROP TABLE IF EXISTS test_as_select ON CLUSTER cluster")
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
print name, instance.ip_address
raw_input("Cluster created, press any key to destroy...")

View File

@ -45,7 +45,7 @@
</volumes>
</jbods_with_external>
<!-- Moving all parts jbod1 if accuired more than 70% -->
<!-- Moving all parts jbod1 if acquired more than 70% -->
<moving_jbod_with_external>
<volumes>
<main>

View File

@ -0,0 +1,36 @@
0 2
-
0 2
1 0
-
1 2
-
1 2
-
1 2
-
1 2
-
0 2
-
0 2
1 0
----
\N 2
-
1 \N
\N 2
-
1 2
-
1 2
-
1 2
-
1 2
-
\N 2
-
1 \N
\N 2
-

View File

@ -0,0 +1,54 @@
drop table if exists X;
drop table if exists Y;
create table X (id Int64) Engine = Memory;
create table Y (id Int64) Engine = Memory;
insert into X (id) values (1);
insert into Y (id) values (2);
set join_use_nulls = 0;
select X.id, Y.id from X right join Y on X.id = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on X.id = (Y.id - 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id - 1) = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = (X.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = (Y.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id + 1) = (X.id + 1) order by X.id, Y.id;
select '----';
set join_use_nulls = 1;
select X.id, Y.id from X right join Y on X.id = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on X.id = (Y.id - 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id - 1) = X.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = Y.id order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on Y.id = (X.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X right join Y on (X.id + 1) = (Y.id + 1) order by X.id, Y.id;
select '-';
select X.id, Y.id from X full join Y on (Y.id + 1) = (X.id + 1) order by X.id, Y.id;
select '-';
drop table X;
drop table Y;

View File

@ -0,0 +1,16 @@
[]
[1,nan,nan,2]
[1,nan,nan,nan,2]
[1,NULL,2]
['hello','','world']
[[[]],[[],[]],[[]]]
[]
['0']
['0']
['0']
['0','1']
['0','1']
['0','1']
['0','1','2']
['0','1','2']
['0','1','2']

View File

@ -0,0 +1,7 @@
SELECT arrayCompact([]);
SELECT arrayCompact([1, 1, nan, nan, 2, 2, 2]);
SELECT arrayCompact([1, 1, nan, nan, -nan, 2, 2, 2]);
SELECT arrayCompact([1, 1, NULL, NULL, 2, 2, 2]);
SELECT arrayCompact(['hello', '', '', '', 'world', 'world']);
SELECT arrayCompact([[[]], [[], []], [[], []], [[]]]);
SELECT arrayCompact(x -> toString(intDiv(x, 3)), range(number)) FROM numbers(10);

View File

@ -0,0 +1,19 @@
привет
привет
аривет
бривет
вривет
гривет
дривет
еривет
жривет
зривет
иривет
йривет
кривет
лривет
мривет
нривет
оривет
привет
你好

View File

@ -0,0 +1,4 @@
SELECT char(0xD0, 0xBF, 0xD1, 0x80, 0xD0, 0xB8, 0xD0, 0xB2, 0xD0, 0xB5, 0xD1, 0x82) AS hello;
SELECT char(-48,-65,-47,-128,-48,-72,-48,-78,-48,-75,-47,-126) AS hello;
SELECT char(-48, 0xB0 + number,-47,-128,-48,-72,-48,-78,-48,-75,-47,-126) AS hello FROM numbers(16);
SELECT char(0xe4, 0xbd, 0xa0, 0xe5, 0xa5, 0xbd) AS hello;

View File

@ -0,0 +1,3 @@
aa aaaa
aa 4
aa String

View File

@ -0,0 +1,3 @@
SELECT toFixedString('aa' , 2 ) as a, concat(a, a);
SELECT toFixedString('aa' , 2 ) as a, length(concat(a, a));
SELECT toFixedString('aa' , 2 ) as a, toTypeName(concat(a, a));

View File

@ -0,0 +1 @@
4999950000

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS empty_pk;
CREATE TABLE empty_pk (x UInt64) ENGINE = MergeTree ORDER BY tuple() SETTINGS index_granularity = 256;
INSERT INTO empty_pk SELECT number FROM numbers(100000);
SELECT sum(x) from empty_pk;
DROP TABLE empty_pk;

View File

@ -0,0 +1,8 @@
drop table if exists test_table_hdfs_syntax
;
create table test_table_hdfs_syntax (id UInt32) ENGINE = HDFS('')
; -- { serverError 42 }
create table test_table_hdfs_syntax (id UInt32) ENGINE = HDFS('','','')
; -- { serverError 42 }
drop table if exists test_table_hdfs_syntax
;

View File

@ -0,0 +1,8 @@
drop table if exists test_table_s3_syntax
;
create table test_table_s3_syntax (id UInt32) ENGINE = S3('')
; -- { serverError 42 }
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','')
; -- { serverError 42 }
drop table if exists test_table_s3_syntax
;

View File

@ -0,0 +1,8 @@
drop table if exists test_table_url_syntax
;
create table test_table_url_syntax (id UInt32) ENGINE = URL('')
; -- { serverError 42 }
create table test_table_url_syntax (id UInt32) ENGINE = URL('','','')
; -- { serverError 42 }
drop table if exists test_table_url_syntax
;

View File

@ -424,7 +424,7 @@ Default value: 163840.
## merge_tree_min_bytes_for_concurrent_read {#setting-merge_tree_min_bytes_for_concurrent_read}
If a number of bytes to read from one file of a [MergeTree*](../table_engines/mergetree.md)-engine table exceeds `merge_tree_min_bytes_for_concurrent_read` then ClickHouse tries to perform a concurrent reading from this file on several threads.
If the number of bytes to read from one file of a [MergeTree*](../table_engines/mergetree.md)-engine table exceeds `merge_tree_min_bytes_for_concurrent_read`, then ClickHouse tries to concurrently read from this file from several threads.
Possible values:
@ -445,7 +445,7 @@ Default value: 0.
## merge_tree_min_bytes_for_seek {#setting-merge_tree_min_bytes_for_seek}
If the distance between two data blocks to be read in one file is less than `merge_tree_min_bytes_for_seek` rows, then ClickHouse does not seek through the file, but reads the data sequentially.
If the distance between two data blocks to be read in one file is less than `merge_tree_min_bytes_for_seek` bytes, then ClickHouse sequentially reads range of file that contains both blocks, thus avoiding extra seek.
Possible values:
@ -466,9 +466,9 @@ Default value: 8.
## merge_tree_max_rows_to_use_cache {#setting-merge_tree_max_rows_to_use_cache}
If ClickHouse should read more than `merge_tree_max_rows_to_use_cache` rows in one query, it does not use the cache of uncompressed blocks. The [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) server setting defines the size of the cache of uncompressed blocks.
If ClickHouse should read more than `merge_tree_max_rows_to_use_cache` rows in one query, it doesn't use the cache of uncompressed blocks.
The cache of uncompressed blocks stores data extracted for queries. ClickHouse uses this cache to speed up responses to repeated small queries. This setting protects the cache from trashing by queries reading a large amount of data.
The cache of uncompressed blocks stores data extracted for queries. ClickHouse uses this cache to speed up responses to repeated small queries. This setting protects the cache from trashing by queries that read a large amount of data. The [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) server setting defines the size of the cache of uncompressed blocks.
Possible values:
@ -479,9 +479,9 @@ Default value: 128 ✕ 8192.
## merge_tree_max_bytes_to_use_cache {#setting-merge_tree_max_bytes_to_use_cache}
If ClickHouse should read more than `merge_tree_max_bytes_to_use_cache` bytes in one query, it does not use the cache of uncompressed blocks. The [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) server setting defines the size of the cache of uncompressed blocks.
If ClickHouse should read more than `merge_tree_max_bytes_to_use_cache` bytes in one query, it doesn't use the cache of uncompressed blocks.
The cache of uncompressed blocks stores data extracted for queries. ClickHouse uses this cache to speed up responses to repeated small queries. This setting protects the cache from trashing by queries reading a large amount of data.
The cache of uncompressed blocks stores data extracted for queries. ClickHouse uses this cache to speed up responses to repeated small queries. This setting protects the cache from trashing by queries that read a large amount of data. The [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) server setting defines the size of the cache of uncompressed blocks.
Possible values:
@ -912,25 +912,32 @@ Default value: `uniqExact`.
## skip_unavailable_shards {#settings-skip_unavailable_shards}
Enables or disables silent skipping of:
Enables or disables silently skipping of unavailable shards.
- Node, if its name cannot be resolved through DNS.
Shard is considered unavailable if all its replicas are unavailable. A replica is unavailable in the following cases:
When skipping is disabled, ClickHouse requires that all the nodes in the [cluster configuration](../server_settings/settings.md#server_settings_remote_servers) can be resolvable through DNS. Otherwise, ClickHouse throws an exception when trying to perform a query on the cluster.
- ClickHouse can't connect to replica for any reason.
If skipping is enabled, ClickHouse considers unresolved nodes as unavailable and tries to resolve them at every connection attempt. Such behavior creates the risk of wrong cluster configuration because a user can specify the wrong node name, and ClickHouse doesn't report about it. However, this can be useful in systems with dynamic DNS, for example, [Kubernetes](https://kubernetes.io), where nodes can be unresolvable during downtime, and this is not an error.
When connecting to a replica, ClickHouse performs several attempts. If all these attempts fail, the replica is considered unavailable.
- Shard, if there are no available replicas of the shard.
- Replica can't be resolved through DNS.
When skipping is disabled, ClickHouse throws an exception.
If replica's hostname can't be resolved through DNS, it can indicate the following situations:
When skipping is enabled, ClickHouse returns a partial answer and doesn't report about issues with nodes availability.
- Replica's host has no DNS record. It can occur in systems with dynamic DNS, for example, [Kubernetes](https://kubernetes.io), where nodes can be unresolvable during downtime, and this is not an error.
- Configuration error. ClickHouse configuration file contains a wrong hostname.
Possible values:
- 1 — skipping enabled.
If a shard is unavailable, ClickHouse returns a result based on partial data and doesn't report node availability issues.
- 0 — skipping disabled.
If a shard is unavailable, ClickHouse throws an exception.
Default value: 0.
## optimize_throw_if_noop {#setting-optimize_throw_if_noop}

View File

@ -10,7 +10,7 @@ Engine parameters:
- `database` Database name. Instead of the database name, you can use a constant expression that returns a string.
- `table` Table to flush data to.
- `num_layers` Parallelism layer. Physically, the table will be represented as 'num_layers' of independent buffers. Recommended value: 16.
- `num_layers` Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.

View File

@ -79,13 +79,13 @@ For a description of parameters, see the [CREATE query description](../../query_
- `SETTINGS` — Additional parameters that control the behavior of the `MergeTree`:
- `index_granularity` — Maximum number of data rows between the marks of an index. Default value: 8192. See [Data Storage](#mergetree-data-storage).
- `index_granularity_bytes` — Maximum size of data granule in bytes. Default value: 10Mb. To restrict the size of granule only by number of rows set 0 (not recommended). See [Data Storage](#mergetree-data-storage).
- `enable_mixed_granularity_parts` — Enables or disables transition to controlling the granule size with the `index_granularity_bytes` setting. Before the version 19.11 there was the only `index_granularity` setting for the granule size restriction. The `index_granularity_bytes` setting improves ClickHouse performance when selecting data from the tables with big rows (tens and hundreds of megabytes). So if you have tables with big rows, you can turn the setting on for the tables to get better efficiency of your `SELECT` queries.
- `index_granularity_bytes` — Maximum size of data granules in bytes. Default value: 10Mb. To restrict the granule size only by number of rows, set to 0 (not recommended). See [Data Storage](#mergetree-data-storage).
- `enable_mixed_granularity_parts` — Enables or disables transitioning to control the granule size with the `index_granularity_bytes` setting. Before version 19.11, there was only the `index_granularity` setting for restricting granule size. The `index_granularity_bytes` setting improves ClickHouse performance when selecting data from tables with big rows (tens and hundreds of megabytes). If you have tables with big rows, you can enable this setting for the tables to improve the efficiency of `SELECT` queries.
- `use_minimalistic_part_header_in_zookeeper` — Storage method of the data parts headers in ZooKeeper. If `use_minimalistic_part_header_in_zookeeper=1`, then ZooKeeper stores less data. For more information, see the [setting description](../server_settings/settings.md#server-settings-use_minimalistic_part_header_in_zookeeper) in "Server configuration parameters".
- `min_merge_bytes_to_use_direct_io` — The minimum data volume for merge operation that is required for using direct I/O access to the storage disk. When merging data parts, ClickHouse calculates the total storage volume of all the data to be merged. If the volume exceeds `min_merge_bytes_to_use_direct_io` bytes, ClickHouse reads and writes the data to the storage disk using the direct I/O interface (`O_DIRECT` option). If `min_merge_bytes_to_use_direct_io = 0`, then direct I/O is disabled. Default value: `10 * 1024 * 1024 * 1024` bytes.
<a name="mergetree_setting-merge_with_ttl_timeout"></a>
- `merge_with_ttl_timeout` — Minimum delay in seconds before repeating a merge with TTL. Default value: 86400 (1 day).
- `write_final_mark` — Enables or disables writing the final index mark at the end of data part. Default value: 1. Don't turn it off.
- `write_final_mark` — Enables or disables writing the final index mark at the end of the data part. Default value: 1. Don't turn it off.
**Example of Sections Setting**
@ -137,9 +137,9 @@ When data is inserted in a table, separate data parts are created and each of th
Data belonging to different partitions are separated into different parts. In the background, ClickHouse merges data parts for more efficient storage. Parts belonging to different partitions are not merged. The merge mechanism does not guarantee that all rows with the same primary key will be in the same data part.
Each data part is logically divided by granules. A granule is the smallest indivisible data set that ClickHouse reads when selecting data. ClickHouse doesn't split rows or values, so each granule always contains an integer number of rows. The first row of a granule is marked with the value of the primary key for this row. For each data part, ClickHouse creates an index file that stores the marks. For each column, whether it is in the primary key or not, ClickHouse also stores the same marks. These marks allow finding the data directly in the columns.
Each data part is logically divided into granules. A granule is the smallest indivisible data set that ClickHouse reads when selecting data. ClickHouse doesn't split rows or values, so each granule always contains an integer number of rows. The first row of a granule is marked with the value of the primary key for the row. For each data part, ClickHouse creates an index file that stores the marks. For each column, whether it's in the primary key or not, ClickHouse also stores the same marks. These marks let you find data directly in column files.
The size of a granule is restricted by the `index_granularity` and `index_granularity_bytes` settings of the table engine. The number of rows in granule lays in the `[1, index_granularity]` range, depending on the size of rows. The size of a granule can exceed `index_granularity_bytes` if the size of the single row is greater than the value of the setting. In this case, the size of the granule equals the size of the row.
The granule size is restricted by the `index_granularity` and `index_granularity_bytes` settings of the table engine. The number of rows in a granule lays in the `[1, index_granularity]` range, depending on the size of the rows. The size of a granule can exceed `index_granularity_bytes` if the size of a single row is greater than the value of the setting. In this case, the size of the granule equals the size of the row.
## Primary Keys and Indexes in Queries {#primary-keys-and-indexes-in-queries}
@ -164,7 +164,7 @@ The examples above show that it is always more effective to use an index than a
A sparse index allows extra data to be read. When reading a single range of the primary key, up to `index_granularity * 2` extra rows in each data block can be read.
Sparse indexes allow you to work with a very large number of table rows, because such indexes fit the computer's RAM in the very most cases.
Sparse indexes allow you to work with a very large number of table rows, because in most cases, such indexes fit in the computer's RAM.
ClickHouse does not require a unique primary key. You can insert multiple rows with the same primary key.
@ -175,7 +175,7 @@ The number of columns in the primary key is not explicitly limited. Depending on
- Improve the performance of an index.
If the primary key is `(a, b)`, then adding another column `c` will improve the performance if the following conditions are met:
- There are queries with a condition on column `c`.
- Long data ranges (several times longer than the `index_granularity`) with identical values for `(a, b)` are common. In other words, when adding another column allows you to skip quite long data ranges.

View File

@ -73,7 +73,7 @@ In this case, you should remember that you don't know the histogram bin borders.
## sequenceMatch(pattern)(timestamp, cond1, cond2, ...) {#function-sequencematch}
Checks whether the sequence contains the event chain that matches the pattern.
Checks whether the sequence contains an event chain that matches the pattern.
```sql
sequenceMatch(pattern)(timestamp, cond1, cond2, ...)
@ -87,9 +87,9 @@ sequenceMatch(pattern)(timestamp, cond1, cond2, ...)
- `pattern` — Pattern string. See [Pattern syntax](#sequence-function-pattern-syntax).
- `timestamp` — Column that considered to contain time data. Typical data types are `Date`, and `DateTime`. You can use also any of the supported [UInt](../../data_types/int_uint.md) data types.
- `timestamp` — Column considered to contain time data. Typical data types are `Date` and `DateTime`. You can also use any of the supported [UInt](../../data_types/int_uint.md) data types.
- `cond1`, `cond2` — Conditions that describe the chain of events. Data type: `UInt8`. You can pass up to 32 condition arguments. The function takes into account only the events described in these conditions. If the sequence contains data that are not described with conditions the function skips them.
- `cond1`, `cond2` — Conditions that describe the chain of events. Data type: `UInt8`. You can pass up to 32 condition arguments. The function takes only the events described in these conditions into account. If the sequence contains data that isn't described in a condition, the function skips them.
**Returned values**
@ -104,11 +104,11 @@ Type: `UInt8`.
<a name="sequence-function-pattern-syntax"></a>
**Pattern syntax**
- `(?N)` — Matches the condition argument at the position `N`. Conditions are numbered in the `[1, 32]` range. For example, `(?1)` matches the argument passed to the `cond1` parameter.
- `(?N)` — Matches the condition argument at position `N`. Conditions are numbered in the `[1, 32]` range. For example, `(?1)` matches the argument passed to the `cond1` parameter.
- `.*` — Matches any number of any events. You don't need the conditional arguments to match this element of the pattern.
- `.*` — Matches any number of events. You don't need conditional arguments to match this element of the pattern.
- `(?t operator value)` — Sets the time in seconds that should separate two events. For example, pattern `(?1)(?t>1800)(?2)` matches events that distanced from each other for more than 1800 seconds. An arbitrary number of any events can lay between these events. You can use the `>=`, `>`, `<`, `<=` operators.
- `(?t operator value)` — Sets the time in seconds that should separate two events. For example, pattern `(?1)(?t>1800)(?2)` matches events that occur more than 1800 seconds from each other. An arbitrary number of any events can lay between these events. You can use the `>=`, `>`, `<`, `<=` operators.
**Examples**
@ -133,7 +133,7 @@ SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2) FROM t
└───────────────────────────────────────────────────────────────────────┘
```
The function has found the event chain where number 2 follows number 1. It skipped number 3 between them, because the number is not described as an event. If we want to take this number into account when searching for the event chain, showed in the example, we should make a condition for it.
The function found the event chain where number 2 follows number 1. It skipped number 3 between them, because the number is not described as an event. If we want to take this number into account when searching for the event chain given in the example, we should make a condition for it.
```sql
SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 3) FROM t
@ -144,7 +144,7 @@ SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 3) FROM
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
In this case the function couldn't find the event chain matching the pattern, because there is the event for number 3 occured between 1 and 2. If in the same case we checked the condition for number 4, the sequence would match the pattern.
In this case, the function couldn't find the event chain matching the pattern, because the event for number 3 occured between 1 and 2. If in the same case we checked the condition for number 4, the sequence would match the pattern.
```sql
SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 4) FROM t
@ -163,7 +163,7 @@ SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 4) FROM
## sequenceCount(pattern)(time, cond1, cond2, ...) {#function-sequencecount}
Counts the number of event chains that matched the pattern. The function searches event chains that not overlap. It starts to search for the next chain after the current chain is matched.
Counts the number of event chains that matched the pattern. The function searches event chains that don't overlap. It starts to search for the next chain after the current chain is matched.
!!! warning "Warning"
Events that occur at the same second may lay in the sequence in an undefined order affecting the result.
@ -176,14 +176,14 @@ sequenceCount(pattern)(timestamp, cond1, cond2, ...)
- `pattern` — Pattern string. See [Pattern syntax](#sequence-function-pattern-syntax).
- `timestamp` — Column that considered to contain time data. Typical data types are `Date`, and `DateTime`. You can also use any of the supported [UInt](../../data_types/int_uint.md) data types.
- `timestamp` — Column considered to contain time data. Typical data types are `Date` and `DateTime`. You can also use any of the supported [UInt](../../data_types/int_uint.md) data types.
- `cond1`, `cond2` — Conditions that describe the chain of events. Data type: `UInt8`. You can pass up to 32 condition arguments. The function takes into account only the events described in these conditions. If the sequence contains data that are not described with conditions the function skips them.
- `cond1`, `cond2` — Conditions that describe the chain of events. Data type: `UInt8`. You can pass up to 32 condition arguments. The function takes only the events described in these conditions into account. If the sequence contains data that isn't described in a condition, the function skips them.
**Returned values**
- Number of non-overlapping event chains that are matched
- Number of non-overlapping event chains that are matched.
Type: `UInt64`.
@ -230,7 +230,7 @@ windowFunnel(window)(timestamp, cond1, cond2, cond3, ...)
**Parameters:**
- `window` — Length of the sliding window in seconds.
- `timestamp` — Name of the column containing the timestamp. Data type support: `Date`,`DateTime`, and other unsigned integer types (note that though timestamp support `UInt64` type, there is a limitation it's value can't overflow maximum of Int64, which is 2^63 - 1).
- `timestamp` — Name of the column containing the timestamp. Data types supported: `Date`, `DateTime`, and other unsigned integer types (note that even though timestamp supports the `UInt64` type, it's value can't exceed the Int64 maximum, which is 2^63 - 1).
- `cond1`, `cond2`... — Conditions or data describing the chain of events. Data type: `UInt8`. Values can be 0 or 1.
**Algorithm**

View File

@ -225,7 +225,6 @@ CREATE TABLE IF NOT EXISTS all_hits ON CLUSTER cluster (p Date, i Int32) ENGINE
In order to run these queries correctly, each host must have the same cluster definition (to simplify syncing configs, you can use substitutions from ZooKeeper). They must also connect to the ZooKeeper servers.
The local version of the query will eventually be implemented on each host in the cluster, even if some hosts are currently not available. The order for executing queries within a single host is guaranteed.
`ALTER` queries are not yet supported for replicated tables.
## CREATE VIEW

View File

@ -1,7 +1,7 @@
# Encoding functions
## char
Accepts multiple arguments of `Number` types. Returns a string, each char of the results corresponds to the ascii character of the input numbers. It'll cast the first byte from the number, if the byte overflows the range of ascii(which is 127), it returns an unrecognized character(<28>).
Accepts multiple arguments of numberic types. Returns a string with the length as the number of passed arguments and each byte has the value of corresponding argument.
## hex

View File

@ -580,11 +580,9 @@ ClickHouse проверит условия `min_part_size` и `min_part_size_rat
```
## remote_servers
## remote_servers {#server_settings_remote_servers}
Конфигурация кластеров, которые использует движок таблиц Distributed.
Пример настройки смотрите в разделе "[Движки таблиц/Distributed](../../operations/table_engines/distributed.md)".
Конфигурация кластеров, которые использует движок таблиц [Distributed](../../operations/table_engines/distributed.md) и табличная функция `cluster`.
**Пример**
@ -595,6 +593,9 @@ ClickHouse проверит условия `min_part_size` и `min_part_size_rat
Значение атрибута `incl` смотрите в разделе "[Конфигурационные файлы](../configuration_files.md#configuration_files)".
**Смотрите также**
- [skip_unavailable_shards](../settings/settings.md#settings-skip_unavailable_shards)
## timezone

View File

@ -384,52 +384,86 @@ Ok.
При чтении из таблиц [MergeTree*](../table_engines/mergetree.md) ClickHouse использует несколько потоков. Этот параметр включает/выключает равномерное распределение заданий по рабочим потокам. Алгоритм равномерного распределения стремится сделать время выполнения всех потоков примерно равным для одного запроса `SELECT`.
**Возможные значения**
Возможные значения:
- 0 — не использовать равномерное распределение заданий на чтение.
- 1 — использовать равномерное распределение заданий на чтение.
**Значение по умолчанию**: 1.
Значение по умолчанию — 1.
## merge_tree_min_rows_for_concurrent_read {#setting-merge_tree_min_rows_for_concurrent_read}
Если количество строк, считываемых из файла таблицы [MergeTree*](../table_engines/mergetree.md) превышает `merge_tree_min_rows_for_concurrent_read`, то ClickHouse пытается выполнить одновременное чтение из этого файла в несколько потоков.
**Возможные значения**
Возможные значения:
Любое положительное целое число.
- Любое положительное целое число.
**Значение по умолчанию**: 163840.
Значение по умолчанию — 163840.
## merge_tree_min_bytes_for_concurrent_read {#setting-merge_tree_min_bytes_for_concurrent_read}
Если число байтов, которые должны быть прочитаны из одного файла таблицы с движком [MergeTree*](../table_engines/mergetree.md) превышает `merge_tree_min_bytes_for_concurrent_read`, то ClickHouse пытается выполнить конкурентное чтение в несколько потоков из этого файла.
Возможные значения:
- Положительное целое число.
Значение по умолчанию — 240 ✕ 1024 ✕ 1024.
## merge_tree_min_rows_for_seek {#setting-merge_tree_min_rows_for_seek}
Если расстояние между двумя блоками данных для чтения в одном файле меньше, чем `merge_tree_min_rows_for_seek` строк, то ClickHouse не перескакивает через блоки, а считывает данные последовательно.
Если расстояние между двумя блоками данных для чтения в одном файле меньше, чем `merge_tree_min_rows_for_seek` строк, то ClickHouse не перескакивает (seek) через блоки, а считывает данные последовательно.
**Возможные значения**
Возможные значения:
Любое положительное целое число.
- Положительное целое число.
**Значение по умолчанию**: 0.
Значение по умолчанию — 0.
## merge_tree_min_bytes_for_seek {#setting-merge_tree_min_bytes_for_seek}
Если расстояние между двумя блоками данных для чтения в одном файле меньше, чем `merge_tree_min_bytes_for_seek` байтов, то ClickHouse не перескакивает (seek) через блоки, а считывает данные последовательно.
Возможные значения:
- Положительное целое число.
Значение по умолчанию — 0.
## merge_tree_coarse_index_granularity {#setting-merge_tree_coarse_index_granularity}
При поиске данных ClickHouse проверяет засечки данных в файле индекса. Если ClickHouse обнаруживает, что требуемые ключи находятся в некотором диапазоне, он делит этот диапазон на `merge_tree_coarse_index_granularity` поддиапазонов и выполняет в них рекурсивный поиск нужных ключей.
**Возможные значения**
Возможные значения:
Любое положительное целое число.
- Положительное целое число.
**Значение по умолчанию**: 8.
Значение по умолчанию — 8.
## merge_tree_max_rows_to_use_cache {#setting-merge_tree_max_rows_to_use_cache}
Если требуется прочитать более, чем `merge_tree_max_rows_to_use_cache` строк в одном запросе, ClickHouse не используют кэш несжатых блоков. Настройка сервера [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) определяет размер кэша несжатых блоков.
Если требуется прочитать более, чем `merge_tree_max_rows_to_use_cache` строк в одном запросе, ClickHouse не используют кэш несжатых блоков.
**Возможные значения**
Кэш несжатых блоков хранит данные, извлечённые при выполнении запросов. ClickHouse использует этот кэш для ускорения ответов на повторяющиеся небольшие запросы. Настройка защищает кэш от замусоривания запросами, для выполнения которых необходимо извлечь большое количество данных. Настройка сервера [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) определяет размер кэша несжатых блоков.
Любое положительное целое число.
Возможные значения:
**Значение по умолчанию**: 1048576.
- Положительное целое число.
Значение по умолчанию — 128 ✕ 8192.
## merge_tree_max_bytes_to_use_cache {#setting-merge_tree_max_bytes_to_use_cache}
Если требуется прочитать более, чем `merge_tree_max_bytes_to_use_cache` байтов в одном запросе, ClickHouse не используют кэш несжатых блоков.
Кэш несжатых блоков хранит данные, извлечённые при выполнении запросов. ClickHouse использует этот кэш для ускорения ответов на повторяющиеся небольшие запросы. Настройка защищает кэш от замусоривания запросами, для выполнения которых необходимо извлечь большое количество данных. Настройка сервера [uncompressed_cache_size](../server_settings/settings.md#server-settings-uncompressed_cache_size) определяет размер кэша несжатых блоков.
Возможные значения:
- Положительное целое число.
Значение по умолчанию — 1920 ✕ 1024 ✕ 1024.
## min_bytes_to_use_direct_io {#settings-min_bytes_to_use_direct_io}
@ -853,6 +887,36 @@ load_balancing = first_or_random
- [Множественный JOIN](../../query_language/select.md#select-join)
## skip_unavailable_shards {#settings-skip_unavailable_shards}
Включает или отключает тихий пропуск недоступных шардов.
Шард считается недоступным, если все его реплики недоступны. Реплика недоступна в следующих случаях:
- ClickHouse не может установить соединение с репликой по любой причине.
ClickHouse предпринимает несколько попыток подключиться к реплике. Если все попытки оказались неудачными, реплика считается недоступной.
- Реплика не может быть разрешена с помощью DNS.
Если имя хоста реплики не может быть разрешено с помощью DNS, это может указывать на следующие ситуации:
- Нет записи DNS для хоста. Это может происходить в системах с динамическим DNS, например, [Kubernetes](https://kubernetes.io), где отключенные ноды не разрешаться с помощью DNS и это не ошибка.
- Ошибка конфигурации. Конфигурационный файл ClickHouse может содержать неправильное имя хоста.
Возможные значения:
- 1 — пропуск включен.
Если шард недоступен, то ClickHouse возвращает результат, основанный на неполных данных и не оповещает о проблемах с доступностью хостов.
- 0 — пропуск выключен.
Если шард недоступен, то ClickHouse генерирует исключение.
Значение по умолчанию — 0.
## optimize_throw_if_noop {#setting-optimize_throw_if_noop}
Включает или отключает генерирование исключения в в случаях, когда запрос [OPTIMIZE](../../query_language/misc.md#misc_operations-optimize) не выполняет мёрж.
@ -866,6 +930,7 @@ load_balancing = first_or_random
Значение по умолчанию — 0.
## os_thread_priority {#setting-os_thread_priority}
Устанавливает приоритет ([nice](https://en.wikipedia.org/wiki/Nice_(Unix))) для потоков, исполняющих запросы. Планировщик ОС учитывает эти приоритеты при выборе следующего потока для исполнения на доступном ядре CPU.

View File

@ -7,18 +7,21 @@ Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_
```
Параметры движка:
database, table - таблица, в которую сбрасывать данные. Вместо имени базы данных может использоваться константное выражение, возвращающее строку.
num_layers - уровень параллелизма. Физически таблица будет представлена в виде num_layers независимых буферов. Рекомендуемое значение - 16.
min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - условия для сброса данных из буфера.
Данные сбрасываются из буфера и записываются в таблицу назначения, если выполнены все min-условия или хотя бы одно max-условие.
min_time, max_time - условие на время в секундах от момента первой записи в буфер;
min_rows, max_rows - условие на количество строк в буфере;
min_bytes, max_bytes - условие на количество байт в буфере.
`database` — имя базы данных. Вместо имени базы данных может использоваться константное выражение, возвращающее строку.
`table` — таблица, в которую сбрасывать данные.
`num_layers` — уровень параллелизма. Физически таблица будет представлена в виде `num_layers` независимых буферов. Рекомендуемое значение — 16.
`min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, `max_bytes` — условия для сброса данных из буфера.
При записи, данные вставляются в случайный из num_layers буферов. Или, если размер куска вставляемых данных достаточно большой (больше max_rows или max_bytes), то он записывается в таблицу назначения минуя буфер.
Данные сбрасываются из буфера и записываются в таблицу назначения, если выполнены все `min`-условия или хотя бы одно `max`-условие.
Условия для сброса данных учитываются отдельно для каждого из num_layers буферов. Например, если num_layers = 16 и max_bytes = 100000000, то максимальный расход оперативки будет 1.6 GB.
- `min_time`, `max_time` — условие на время в секундах от момента первой записи в буфер.
- `min_rows`, `max_rows` — условие на количество строк в буфере.
- `min_bytes`, `max_bytes` — условие на количество байт в буфере.
При записи, данные вставляются в случайный из `num_layers` буферов. Или, если размер куска вставляемых данных достаточно большой (больше `max_rows` или `max_bytes`), то он записывается в таблицу назначения минуя буфер.
Условия для сброса данных учитываются отдельно для каждого из `num_layers` буферов. Например, если `num_layers = 16` и `max_bytes = 100000000`, то максимальный расход оперативки будет 1.6 GB.
Пример:

View File

@ -78,10 +78,14 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `SETTINGS` — дополнительные параметры, регулирующие поведение `MergeTree`:
- `index_granularity` — гранулярность индекса. Число строк данных между «засечками» индекса. По умолчанию — 8192. Список всех доступных параметров можно посмотреть в [MergeTreeSettings.h](https://github.com/ClickHouse/ClickHouse/blob/master/dbms/src/Storages/MergeTree/MergeTreeSettings.h).
- `min_merge_bytes_to_use_direct_io` — минимальный объем данных, необходимый для прямого (небуферизованного) чтения/записи (direct I/O) на диск. При слиянии частей данных ClickHouse вычисляет общий объем хранения всех данных, подлежащих слиянию. Если общий объем хранения всех данных для чтения превышает `min_bytes_to_use_direct_io` байт, тогда ClickHouse использует флаг `O_DIRECT` при чтении данных с диска. Если `min_merge_bytes_to_use_direct_io = 0`, тогда прямой ввод-вывод отключен. Значение по умолчанию: `10 * 1024 * 1024 * 1024` байт.
- `index_granularity` — максимальное количество строк данных между засечками индекса. По умолчанию — 8192. Смотрите [Хранение данных](#mergetree-data-storage).
- `index_granularity_bytes` — максимальный размер гранул данных в байтах. По умолчанию — 10Mb. Чтобы ограничить размер гранул только количеством строк, установите значение 0 (не рекомендовано). Смотрите [Хранение данных](#mergetree-data-storage).
- `enable_mixed_granularity_parts` — включает или выключает переход к ограничению размера гранул с помощью настройки `index_granularity_bytes`. До версии 19.11, размер гранул ограничивался только настройкой `index_granularity`. Настройка `index_granularity_bytes` улучшает производительность ClickHouse при выборке данных из таблиц с большими (десятки и сотни мегабайтов) строками. Если у вас есть таблицы с большими строками, можно включить эту настройку, чтобы повысить эффективность запросов `SELECT`.
- `use_minimalistic_part_header_in_zookeeper` — Способ хранения заголовков кусков данных в ZooKeeper. Если `use_minimalistic_part_header_in_zookeeper = 1`, то ZooKeeper хранит меньше данных. Подробнее читайте в [описании настройки](../server_settings/settings.md#server-settings-use_minimalistic_part_header_in_zookeeper) в разделе "Конфигурационные параметры сервера".
- `min_merge_bytes_to_use_direct_io` — минимальный объем данных при слиянии, необходимый для прямого (небуферизованного) чтения/записи (direct I/O) на диск. При слиянии частей данных ClickHouse вычисляет общий объем хранения всех данных, подлежащих слиянию. Если общий объем хранения всех данных для чтения превышает `min_bytes_to_use_direct_io` байт, тогда ClickHouse использует флаг `O_DIRECT` при чтении данных с диска. Если `min_merge_bytes_to_use_direct_io = 0`, тогда прямой ввод-вывод отключен. Значение по умолчанию: `10 * 1024 * 1024 * 1024` байтов.
<a name="mergetree_setting-merge_with_ttl_timeout"></a>
- `merge_with_ttl_timeout` - Минимальное время в секундах для повторного выполнения слияний с TTL. По умолчанию - 86400 (1 день).
- `merge_with_ttl_timeout` — минимальное время в секундах перед повторным слиянием с TTL. По умолчанию — 86400 (1 день).
- `write_final_mark` — включает или отключает запись последней засечки индекса в конце куска данных. По умолчанию — 1. Не отключайте её.
**Пример задания секций**
@ -126,7 +130,7 @@ MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)
</details>
## Хранение данных
## Хранение данных {#mergetree-data-storage}
Таблица состоит из *кусков* данных (data parts), отсортированных по первичному ключу.
@ -134,9 +138,10 @@ MergeTree(EventDate, intHash32(UserID), (CounterID, EventDate, intHash32(UserID)
Данные, относящиеся к разным партициям, разбиваются на разные куски. В фоновом режиме ClickHouse выполняет слияния (merge) кусков данных для более эффективного хранения. Куски, относящиеся к разным партициям не объединяются. Механизм слияния не гарантирует, что все строки с одинаковым первичным ключом окажутся в одном куске.
Для каждого куска данных ClickHouse создаёт индексный файл, который содержит значение первичного ключа для каждой индексной строки («засечка»). Номера строк индекса определяются как `n * index_granularity`. Максимальное значение `n` равно целой части деления общего числа строк на `index_granularity`. Для каждого столбца "засечки" также записываются для тех же строк индекса, что и первичный ключ. Эти "засечки" позволяют находить данные непосредственно в столбцах.
Каждый кусок данных логически делится на гранулы. Гранула — это минимальный неделимый набор данных, который ClickHouse считывает при выборке данных. ClickHouse не разбивает строки и значения и гранула всегда содержит целое число строк. Первая строка гранулы помечается значением первичного ключа для этой строки (засечка). Для каждого куска данных ClickHouse создаёт файл с засечками (индексный файл). Для каждого столбца, независимо от того, входит он в первичный ключ или нет, ClickHouse также сохраняет эти же засечки. Засечки используются для поиска данных напрямую в файлах столбцов.
Размер гранул оганичен настройками движка `index_granularity` и `index_granularity_bytes`. Количество строк в грануле лежит в диапазоне `[1, index_granularity]`, в зависимости от размера строк. Размер гранулы может превышать `index_granularity_bytes` в том случае, когда размер единственной строки в грануле превышает значение настройки. В этом случае, размер гранулы равен размеру строки.
Вы можете использовать одну большую таблицу, постоянно добавляя в неё данные пачками, именно для этого предназначен движок `MergeTree`.
## Первичные ключи и индексы в запросах {#primary-keys-and-indexes-in-queries}
@ -159,9 +164,9 @@ Marks numbers: 0 1 2 3 4 5 6 7 8
Примеры выше показывают, что использование индекса всегда эффективнее, чем full scan.
Разреженный индекс допускает чтение лишних строк. При чтении одного диапазона первичного ключа, может быть прочитано до `index_granularity * 2` лишних строк в каждом блоке данных. В большинстве случаев ClickHouse не теряет производительности при `index_granularity = 8192`.
Разреженный индекс допускает чтение лишних строк. При чтении одного диапазона первичного ключа, может быть прочитано до `index_granularity * 2` лишних строк в каждом блоке данных.
Разреженность индекса позволяет работать даже с очень большим количеством строк в таблицах, поскольку такой индекс всегда помещается в оперативную память компьютера.
Разреженный индекс почти всегда помещаеся в оперативную память и поволяет работать с очень большим количеством строк в таблицах.
ClickHouse не требует уникального первичного ключа. Можно вставить много строк с одинаковым первичным ключом.

View File

@ -71,51 +71,148 @@ FROM
В этом случае необходимо помнить, что границы корзин гистограммы не известны.
## sequenceMatch(pattern)(time, cond1, cond2, ...)
## sequenceMatch(pattern)(timestamp, cond1, cond2, ...) {#function-sequencematch}
Сопоставление с образцом для цепочки событий.
`pattern` - строка, содержащая шаблон для сопоставления. Шаблон похож на регулярное выражение.
`time` - время события, тип DateTime
`cond1`, `cond2` ... - от одного до 32 аргументов типа UInt8 - признаков, было ли выполнено некоторое условие для события.
Функция собирает в оперативке последовательность событий. Затем производит проверку на соответствие этой последовательности шаблону.
Возвращает UInt8 - 0, если шаблон не подходит и 1, если шаблон подходит.
Пример: `sequenceMatch('(?1).*(?2)')(EventTime, URL LIKE '%company%', URL LIKE '%cart%')`
- была ли цепочка событий, в которой посещение страницы с адресом, содержащим company было раньше по времени посещения страницы с адресом, содержащим cart.
Это вырожденный пример. Его можно записать с помощью других агрегатных функций:
Проверяет, содержит ли последовательность событий цепочку, которая соответствует указанному шаблону.
```sql
minIf(EventTime, URL LIKE '%company%') < maxIf(EventTime, URL LIKE '%cart%').
sequenceMatch(pattern)(timestamp, cond1, cond2, ...)
```
Но в более сложных случаях, такого решения нет.
!!! warning "Предупреждение"
События, произошедшие в одну и ту же секунду, располагаются в последовательности в неопределенном порядке, что может повлиять на результат работы функции.
Синтаксис шаблонов:
`(?1)` - ссылка на условие (вместо 1 - любой номер);
**Параметры**
`.*` - произвольное количество любых событий;
- `pattern` — строка с шаблоном. Смотрите [Синтаксис шаблонов](#sequence-function-pattern-syntax).
`(?t>=1800)` - условие на время;
- `timestamp` — столбец, содержащий метки времени. Типичный тип данных столбца — `Date` или `DateTime`. Также можно использовать любой из поддержанных типов данных [UInt](../../data_types/int_uint.md).
за указанное время допускается любое количество любых событий;
- `cond1`, `cond2` — условия, описывающие цепочку событий. Тип данных — `UInt8`. Можно использовать до 32 условий. Функция учитывает только те события, которые указаны в условиях. Функция пропускает данные из последовательности, если они не описаны ни в одном из условий.
вместо `>=` могут использоваться операторы `<`, `>`, `<=`;
вместо 1800 может быть любое число;
**Возвращаемые значения**
События, произошедшие в одну секунду, могут оказаться в цепочке в произвольном порядке. От этого может зависеть результат работы функции.
- 1, если цепочка событий, соответствующая шаблону найдена.
- 0, если цепочка событий, соответствующая шаблону не найдена.
## sequenceCount(pattern)(time, cond1, cond2, ...)
Тип: `UInt8`.
Аналогично функции sequenceMatch, но возвращает не факт наличия цепочки событий, а UInt64 - количество найденных цепочек.
Цепочки ищутся без перекрытия. То есть, следующая цепочка может начаться только после окончания предыдущей.
<a name="sequence-function-pattern-syntax"></a>
**Синтаксис шаблонов**
- `(?N)` — соответствует условию на позиции `N`. Условия пронумерованы по порядку в диапазоне `[1, 32]`. Например, `(?1)` соответствует условию, заданному параметром `cond1`.
- `.*` — соответствует любому количеству событий. Для этого элемента шаблона не надо задавать условия.
- `(?t operator value)` — устанавливает время в секундах, которое должно разделять два события. Например, шаблон `(?1)(?t>1800)(?2)` соответствует событиям, которые произошли более чем через 1800 секунд друг от друга. Между этими событиями может находиться произвольное количество любых событий. Операторы могут быть `>=`, `>`, `<`, `<=`.
**Примеры**
Пусть таблица `t` содержит следующие данные:
```text
┌─time─┬─number─┐
│ 1 │ 1 │
│ 2 │ 3 │
│ 3 │ 2 │
└──────┴────────┘
```
Выполним запрос:
```sql
SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2) FROM t
```
```text
┌─sequenceMatch('(?1)(?2)')(time, equals(number, 1), equals(number, 2))─┐
│ 1 │
└───────────────────────────────────────────────────────────────────────┘
```
Функция нашла цепочку событий, в которой число 2 следует за числом 1. Число 3 между ними было пропущено, поскольку оно не было использовано ни в одном из условий.
```sql
SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 3) FROM t
```
```text
┌─sequenceMatch('(?1)(?2)')(time, equals(number, 1), equals(number, 2), equals(number, 3))─┐
│ 0 │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
В этом случае функция не может найти цепочку событий, соответствующую шаблону, поскольку событие для числа 3 произошло между 1 и 2. Если бы в этом же случае мы бы проверяли условие на событие для числа 4, то цепочка бы соответствовала шаблону.
```sql
SELECT sequenceMatch('(?1)(?2)')(time, number = 1, number = 2, number = 4) FROM t
```
```text
┌─sequenceMatch('(?1)(?2)')(time, equals(number, 1), equals(number, 2), equals(number, 4))─┐
│ 1 │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
**Смотрите также**
- [sequenceCount](#function-sequencecount)
## sequenceCount(pattern)(time, cond1, cond2, ...) {#function-sequencecount}
Вычисляет количество цепочек событий, соответствующих шаблону. Функция обнаруживает только непересекающиеся цепочки событий. Она начитает искать следующую цепочку только после того, как полностью совпала текущая цепочка событий.
!!! warning "Предупреждение"
События, произошедшие в одну и ту же секунду, располагаются в последовательности в неопределенном порядке, что может повлиять на результат работы функции.
```sql
sequenceCount(pattern)(timestamp, cond1, cond2, ...)
```
**Параметры**
- `pattern` — строка с шаблоном. Смотрите [Синтаксис шаблонов](#sequence-function-pattern-syntax).
- `timestamp` — столбец, содержащий метки времени. Типичный тип данных столбца — `Date` или `DateTime`. Также можно использовать любой из поддержанных типов данных [UInt](../../data_types/int_uint.md).
- `cond1`, `cond2` — условия, описывающие цепочку событий. Тип данных — `UInt8`. Можно использовать до 32 условий. Функция учитывает только те события, которые указаны в условиях. Функция пропускает данные из последовательности, если они не описаны ни в одном из условий.
**Возвращаемое значение**
- Число непересекающихся цепочек событий, соответствущих шаблону.
Тип: `UInt64`.
**Пример**
Пусть таблица `t` содержит следующие данные:
```text
┌─time─┬─number─┐
│ 1 │ 1 │
│ 2 │ 3 │
│ 3 │ 2 │
│ 4 │ 1 │
│ 5 │ 3 │
│ 6 │ 2 │
└──────┴────────┘
```
Вычислим сколько раз число 2 стоит после числа 1, причем между 1 и 2 могут быть любые числа:
```sql
SELECT sequenceCount('(?1).*(?2)')(time, number = 1, number = 2) FROM t
```
```text
┌─sequenceCount('(?1).*(?2)')(time, equals(number, 1), equals(number, 2))─┐
│ 2 │
└─────────────────────────────────────────────────────────────────────────┘
```
**Смотрите также**
- [sequenceMatch](#function-sequencematch)
## windowFunnel(window)(timestamp, cond1, cond2, cond3, ...)

View File

@ -223,7 +223,7 @@ CREATE TABLE IF NOT EXISTS all_hits ON CLUSTER cluster (p Date, i Int32) ENGINE
```
Для корректного выполнения таких запросов необходимо на каждом хосте иметь одинаковое определение кластера (для упрощения синхронизации конфигов можете использовать подстановки из ZooKeeper). Также необходимо подключение к ZooKeeper серверам.
Локальная версия запроса в конечном итоге будет выполнена на каждом хосте кластера, даже если некоторые хосты в данный момент не доступны. Гарантируется упорядоченность выполнения запросов в рамках одного хоста. Для реплицированных таблиц не поддерживаются запросы `ALTER`.
Локальная версия запроса в конечном итоге будет выполнена на каждом хосте кластера, даже если некоторые хосты в данный момент не доступны. Гарантируется упорядоченность выполнения запросов в рамках одного хоста.
## CREATE VIEW

View File

@ -202,7 +202,6 @@ CREATE TABLE IF NOT EXISTS all_hits ON CLUSTER cluster (p Date, i Int32) ENGINE
为了能够正确的运行这种查询每台主机必须具有相同的cluster声明为了简化配置的同步你可以使用zookeeper的方式进行配置。同时这些主机还必须链接到zookeeper服务器。
这个查询将最终在集群的每台主机上运行,即使一些主机当前处于不可用状态。同时它还保证了所有的查询在单台主机中的执行顺序。
replicated系列表还没有支持`ALTER`查询。
## CREATE VIEW