Merge branch 'master' into test_multiple_nodes

This commit is contained in:
Alexander Tokmakov 2021-03-30 16:23:33 +03:00
commit 500a20f30d
47 changed files with 875 additions and 295 deletions

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 7436366ceb341ba5c00ea29f1645e02a2b70bf93
Subproject commit 8d558f03fe370240081424fafa76cdc9301ea14b

View File

@ -71,6 +71,7 @@ function run_tests()
# Skip these tests, because they fail when we rerun them multiple times
if [ "$NUM_TRIES" -gt "1" ]; then
ADDITIONAL_OPTIONS+=('--order=random')
ADDITIONAL_OPTIONS+=('--skip')
ADDITIONAL_OPTIONS+=('00000_no_tests_to_skip')
ADDITIONAL_OPTIONS+=('--jobs')
@ -93,7 +94,13 @@ timeout "$MAX_RUN_TIME" bash -c run_tests ||:
./process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz ||:
clickhouse-client -q "system flush logs" ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
clickhouse-client -q "select * from system.query_log format TSVWithNamesAndTypes" | pigz > /test_output/query-log.tsv.gz &
clickhouse-client -q "select * from system.query_thread_log format TSVWithNamesAndTypes" | pigz > /test_output/query-thread-log.tsv.gz &
wait ||:
mv /var/log/clickhouse-server/stderr.log /test_output/ ||:
if [[ -n "$WITH_COVERAGE" ]] && [[ "$WITH_COVERAGE" -eq 1 ]]; then
tar -chf /test_output/clickhouse_coverage.tar.gz /profraw ||:

View File

@ -394,3 +394,55 @@ Result:
└──────────────────┴────────────────────┘
```
## isIPAddressInRange {#isipaddressinrange}
Determines if an IP address is contained in a network represented in the [CIDR](https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing) notation. Returns `1` if true, or `0` otherwise.
**Syntax**
``` sql
isIPAddressInRange(address, prefix)
```
This function accepts both IPv4 and IPv6 addresses (and networks) represented as strings. It returns `0` if the IP version of the address and the CIDR don't match.
**Arguments**
- `address` — An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md).
- `prefix` — An IPv4 or IPv6 network prefix in CIDR. [String](../../sql-reference/data-types/string.md).
**Returned value**
- `1` or `0`.
Type: [UInt8](../../sql-reference/data-types/int-uint.md).
**Example**
Query:
``` sql
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8')
```
Result:
``` text
┌─isIPAddressInRange('127.0.0.1', '127.0.0.0/8')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Query:
``` sql
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16')
```
Result:
``` text
┌─isIPAddressInRange('127.0.0.1', 'ffff::/16')─┐
│ 0 │
└──────────────────────────────────────────────┘
```

View File

@ -395,3 +395,54 @@ SELECT addr, isIPv6String(addr) FROM ( SELECT ['::', '1111::ffff', '::ffff:127.0
└──────────────────┴────────────────────┘
```
## isIPAddressInRange {#isipaddressinrange}
Проверяет попадает ли IP адрес в интервал, заданный в [CIDR](https://en.wikipedia.org/wiki/Classless_Inter-Domain_Routing) нотации.
**Syntax**
``` sql
isIPAddressInRange(address, prefix)
```
Функция принимает IPv4 или IPv6 адрес виде строки. Возвращает `0`, если версия адреса и интервала не совпадают.
**Аргументы**
- `address` — IPv4 или IPv6 адрес. [String](../../sql-reference/data-types/string.md).
- `prefix` — IPv4 или IPv6 подсеть, заданная в CIDR нотации. [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- `1` или `0`.
Тип: [UInt8](../../sql-reference/data-types/int-uint.md).
**Примеры**
Запрос:
``` sql
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8')
```
Результат:
``` text
┌─isIPAddressInRange('127.0.0.1', '127.0.0.0/8')─┐
│ 1 │
└────────────────────────────────────────────────┘
```
Запрос:
``` sql
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16')
```
Результат:
``` text
┌─isIPAddressInRange('127.0.0.1', 'ffff::/16')─┐
│ 0 │
└──────────────────────────────────────────────┘
```

View File

@ -599,11 +599,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
toString(current_piece_number));
Settings settings_push = task_cluster->settings_push;
/// It is important, ALTER ATTACH PARTITION must be done synchronously
/// And we will execute this ALTER query on each replica of a shard.
/// It is correct, because this query is idempotent.
settings_push.replication_alter_partitions_sync = 2;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
UInt64 max_successful_executions_per_shard = 0;
if (settings_push.replication_alter_partitions_sync == 1)
{
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
max_successful_executions_per_shard = 1;
}
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
@ -613,15 +615,34 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
try
{
size_t num_nodes = executeQueryOnCluster(
/// Try attach partition on each shard
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_alter_ast_string,
settings_push,
task_cluster->settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
execution_mode,
max_successful_executions_per_shard);
if (settings_push.replication_alter_partitions_sync == 1)
{
LOG_INFO(
log,
"Destination tables {} have been executed alter query successfully on {} shards of {}",
getQuotedTable(task_table.table_push),
num_nodes,
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
else
{
LOG_INFO(log, "Number of nodes that executed ALTER query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
LOG_DEBUG(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
@ -856,6 +877,16 @@ bool ClusterCopier::tryDropPartitionPiece(
bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Create destination table
TaskStatus task_status = TaskStatus::Error;
task_status = tryCreateDestinationTable(timeouts, task_table);
/// Exit if success
if (task_status != TaskStatus::Finished)
{
LOG_WARNING(log, "Create destination Tale Failed ");
return false;
}
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@ -932,7 +963,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
/// Do not sleep if there is a sequence of already processed shards to increase startup
bool is_unprioritized_task = !previous_shard_is_instantly_finished && shard->priority.is_remote;
TaskStatus task_status = TaskStatus::Error;
task_status = TaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
@ -1050,6 +1081,44 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
return table_is_done;
}
TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table)
{
/// Try create original table (if not exists) on each shard
//TaskTable & task_table = task_shard.task_table;
const TaskShardPtr task_shard = task_table.all_shards.at(0);
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard->current_pull_table_create_query = getCreateTableForPullShard(timeouts, *task_shard);
try
{
auto create_query_push_ast
= rewriteCreateQueryStorage(task_shard->current_pull_table_create_query, task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
return TaskStatus::Finished;
}
/// Job for copying partition from particular shard.
TaskStatus ClusterCopier::tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task)
{
@ -1366,8 +1435,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
/// Do the copying
@ -1477,26 +1555,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
LOG_INFO(log, "Partition {} piece {} copied. But not moved to original destination table.", task_partition.name, toString(current_piece_number));
/// Try create original table (if not exists) on each shard
try
{
auto create_query_push_ast = rewriteCreateQueryStorage(task_shard.current_pull_table_create_query,
task_table.table_push, task_table.engine_push_ast);
auto & create = create_query_push_ast->as<ASTCreateQuery &>();
create.if_not_exists = true;
InterpreterCreateQuery::prepareOnClusterQuery(create, context, task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables {} have been created on {} shards of {}", getQuotedTable(task_table.table_push), shards, task_table.cluster_push->getShardCount());
}
catch (...)
{
tryLogCurrentException(log, "Error while creating original table. Maybe we are not first.");
}
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
@ -1538,14 +1596,13 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
interpreter.execute();
}
void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number)
{
LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number);
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
LOG_DEBUG(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
DatabaseAndTableName helping_table
= DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "DROP TABLE IF EXISTS " + getQuotedTable(helping_table);
@ -1553,17 +1610,21 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
LOG_DEBUG(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
dropHelpingTablesByPieceNumber(task_table, current_piece_number);
}
}
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
@ -1586,7 +1647,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_DEBUG(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
}
LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
}

View File

@ -123,12 +123,13 @@ protected:
bool tryDropPartitionPiece(ShardPartition & task_partition, const size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 1000;
static constexpr UInt64 max_shard_partition_tries = 600;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 100;
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
/// Job for copying partition from particular shard.
TaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts,
ShardPartition & task_partition,
@ -149,6 +150,8 @@ protected:
void dropHelpingTables(const TaskTable & task_table);
void dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number);
/// Is used for usage less disk space.
/// After all pieces were successfully moved to original destination
/// table we can get rid of partition pieces (partitions in helping tables).

View File

@ -98,6 +98,7 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}
}

View File

@ -19,15 +19,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
struct ComparePair final
{
template <typename T1, typename T2>
bool operator()(const std::pair<T1, T2> & lhs, const std::pair<T1, T2> & rhs) const
{
return lhs.first == rhs.first ? lhs.second < rhs.second : lhs.first < rhs.first;
}
};
static constexpr auto max_events = 32;
template <typename T>
@ -35,7 +26,6 @@ struct AggregateFunctionWindowFunnelData
{
using TimestampEvent = std::pair<T, UInt8>;
using TimestampEvents = PODArrayWithStackMemory<TimestampEvent, 64>;
using Comparator = ComparePair;
bool sorted = true;
TimestampEvents events_list;
@ -69,7 +59,7 @@ struct AggregateFunctionWindowFunnelData
/// either sort whole container or do so partially merging ranges afterwards
if (!sorted && !other.sorted)
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
std::stable_sort(std::begin(events_list), std::end(events_list));
else
{
const auto begin = std::begin(events_list);
@ -77,12 +67,12 @@ struct AggregateFunctionWindowFunnelData
const auto end = std::end(events_list);
if (!sorted)
std::stable_sort(begin, middle, Comparator{});
std::stable_sort(begin, middle);
if (!other.sorted)
std::stable_sort(middle, end, Comparator{});
std::stable_sort(middle, end);
std::inplace_merge(begin, middle, end, Comparator{});
std::inplace_merge(begin, middle, end);
}
sorted = true;
@ -92,7 +82,7 @@ struct AggregateFunctionWindowFunnelData
{
if (!sorted)
{
std::stable_sort(std::begin(events_list), std::end(events_list), Comparator{});
std::stable_sort(std::begin(events_list), std::end(events_list));
sorted = true;
}
}

View File

@ -13,8 +13,7 @@ namespace DB
/// Result array could be indexed with all possible uint8 values without extra check.
/// For values greater than 128 we will store same value as for 128 (all bits set).
constexpr size_t IPV6_MASKS_COUNT = 256;
using RawMaskArray = std::array<uint8_t, IPV6_BINARY_LENGTH>;
using RawMaskArrayV6 = std::array<uint8_t, IPV6_BINARY_LENGTH>;
void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res)
{
@ -41,33 +40,86 @@ std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address)
return res;
}
static constexpr RawMaskArray generateBitMask(size_t prefix)
template <typename RawMaskArrayT>
static constexpr RawMaskArrayT generateBitMask(size_t prefix)
{
if (prefix >= 128)
prefix = 128;
RawMaskArray arr{0};
RawMaskArrayT arr{0};
if (prefix >= arr.size() * 8)
prefix = arr.size() * 8;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
arr[i] = 0xff;
if (prefix > 0)
arr[i++] = ~(0xff >> prefix);
while (i < 16)
while (i < arr.size())
arr[i++] = 0x00;
return arr;
}
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> generateBitMasks()
template <typename RawMaskArrayT, size_t masksCount>
static constexpr std::array<RawMaskArrayT, masksCount> generateBitMasks()
{
std::array<RawMaskArray, IPV6_MASKS_COUNT> arr{};
for (size_t i = 0; i < IPV6_MASKS_COUNT; ++i)
arr[i] = generateBitMask(i);
std::array<RawMaskArrayT, masksCount> arr{};
for (size_t i = 0; i < masksCount; ++i)
arr[i] = generateBitMask<RawMaskArrayT>(i);
return arr;
}
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len)
const std::array<uint8_t, 16> & getCIDRMaskIPv6(UInt8 prefix_len)
{
static constexpr std::array<RawMaskArray, IPV6_MASKS_COUNT> IPV6_RAW_MASK_ARRAY = generateBitMasks();
return IPV6_RAW_MASK_ARRAY[prefix_len].data();
static constexpr auto IPV6_RAW_MASK_ARRAY = generateBitMasks<RawMaskArrayV6, IPV6_MASKS_COUNT>();
return IPV6_RAW_MASK_ARRAY[prefix_len];
}
bool matchIPv4Subnet(UInt32 addr, UInt32 cidr_addr, UInt8 prefix)
{
UInt32 mask = (prefix >= 32) ? 0xffffffffu : ~(0xffffffffu >> prefix);
return (addr & mask) == (cidr_addr & mask);
}
#if defined(__SSE2__)
#include <emmintrin.h>
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix)
{
uint16_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(addr)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(cidr_addr))));
mask = ~mask;
if (mask)
{
auto offset = __builtin_ctz(mask);
if (prefix / 8 != offset)
return prefix / 8 < offset;
auto cmpmask = ~(0xff >> (prefix % 8));
return (addr[offset] & cmpmask) == (cidr_addr[offset] & cmpmask);
}
return true;
}
# else
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix)
{
if (prefix > IPV6_BINARY_LENGTH * 8U)
prefix = IPV6_BINARY_LENGTH * 8U;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
{
if (addr[i] != cidr_addr[i])
return false;
}
if (prefix == 0)
return true;
auto mask = ~(0xff >> prefix);
return (addr[i] & mask) == (cidr_addr[i] & mask);
}
#endif // __SSE2__
}

View File

@ -14,9 +14,13 @@ void IPv6ToRawBinary(const Poco::Net::IPAddress & address, char * res);
/// Convert IP address to 16-byte array with IPv6 data (big endian). If it's an IPv4, map it to IPv6.
std::array<char, 16> IPv6ToBinary(const Poco::Net::IPAddress & address);
/// Returns pointer to 16-byte array containing mask with first `prefix_len` bits set to `1` and `128 - prefix_len` to `0`.
/// Pointer is valid during all program execution time and doesn't require freeing.
/// Returns a reference to 16-byte array containing mask with first `prefix_len` bits set to `1` and `128 - prefix_len` to `0`.
/// The reference is valid during all program execution time.
/// Values of prefix_len greater than 128 interpreted as 128 exactly.
const uint8_t * getCIDRMaskIPv6(UInt8 prefix_len);
const std::array<uint8_t, 16> & getCIDRMaskIPv6(UInt8 prefix_len);
/// Check that address contained in CIDR range
bool matchIPv4Subnet(UInt32 addr, UInt32 cidr_addr, UInt8 prefix);
bool matchIPv6Subnet(const uint8_t * addr, const uint8_t * cidr_addr, UInt8 prefix);
}

View File

@ -25,7 +25,7 @@ void formatIPv6(const unsigned char * src, char *& dst, uint8_t zeroed_tail_byte
/** Unsafe (no bounds-checking for src nor dst), optimized version of parsing IPv4 string.
*
* Parses the input string `src` and stores binary BE value into buffer pointed by `dst`,
* Parses the input string `src` and stores binary host-endian value into buffer pointed by `dst`,
* which should be long enough.
* That is "127.0.0.1" becomes 0x7f000001.
*
@ -63,7 +63,7 @@ inline bool parseIPv4(const char * src, unsigned char * dst)
/** Unsafe (no bounds-checking for src nor dst), optimized version of parsing IPv6 string.
*
* Slightly altered implementation from http://svn.apache.org/repos/asf/apr/apr/trunk/network_io/unix/inet_pton.c
* Parses the input string `src` and stores binary LE value into buffer pointed by `dst`,
* Parses the input string `src` and stores binary big-endian value into buffer pointed by `dst`,
* which should be long enough. In case of failure zeroes
* IPV6_BINARY_LENGTH bytes of buffer pointed by `dst`.
*

View File

@ -470,8 +470,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2010,
::testing::ValuesIn(allTimezones()),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20101031), YYYYMMDDToDay(20101101), 15 * 60},
{YYYYMMDDToDay(20100328), YYYYMMDDToDay(20100330), 15 * 60}
{YYYYMMDDToDay(20101031), YYYYMMDDToDay(20101101), 10 * 15 * 60},
{YYYYMMDDToDay(20100328), YYYYMMDDToDay(20100330), 10 * 15 * 60}
}))
);
@ -481,7 +481,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year1970_WHOLE,
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 10 * 3191 /*53m 11s*/},
}))
);
@ -491,7 +491,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2010_WHOLE,
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20100101), YYYYMMDDToDay(20101231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(20100101), YYYYMMDDToDay(20101231), 10 * 3191 /*53m 11s*/},
}))
);
@ -501,7 +501,7 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year2020_WHOLE,
::testing::ValuesIn(allTimezones()),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
// Values from tests/date_lut3.cpp
{YYYYMMDDToDay(20200101), YYYYMMDDToDay(20201231), 3191 /*53m 11s*/},
{YYYYMMDDToDay(20200101), YYYYMMDDToDay(20201231), 10 * 3191 /*53m 11s*/},
}))
);
@ -510,8 +510,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_PreEpoch,
::testing::Combine(
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
{YYYYMMDDToDay(19500101), YYYYMMDDToDay(19600101), 15 * 60},
{YYYYMMDDToDay(19300101), YYYYMMDDToDay(19350101), 11 * 15 * 60}
{YYYYMMDDToDay(19500101), YYYYMMDDToDay(19600101), 10 * 15 * 60},
{YYYYMMDDToDay(19300101), YYYYMMDDToDay(19350101), 10 * 11 * 15 * 60}
}))
);
@ -520,8 +520,8 @@ INSTANTIATE_TEST_SUITE_P(AllTimezones_Year1970,
::testing::Combine(
::testing::ValuesIn(allTimezones(false)),
::testing::ValuesIn(std::initializer_list<TimeRangeParam>{
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19700201), 15 * 60},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 11 * 13 * 17}
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19700201), 10 * 15 * 60},
{YYYYMMDDToDay(19700101), YYYYMMDDToDay(19701231), 10 * 11 * 13 * 17}
// // 11 was chosen as a number which can't divide product of 2-combinarions of (7, 24, 60),
// // to reduce likelehood of hitting same hour/minute/second values for different days.
// // + 12 is just to make sure that last day is covered fully.

View File

@ -4,19 +4,17 @@
#include <Common/assert_cast.h>
#include <Common/IPv6ToBinary.h>
#include <Common/memcmpSmall.h>
#include <Common/memcpySmall.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/WriteIntText.h>
#include <Poco/ByteOrder.h>
#include <Common/formatIPv6.h>
#include <common/itoa.h>
#include <ext/map.h>
#include <ext/range.h>
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Functions/FunctionHelpers.h>
namespace DB
@ -191,57 +189,6 @@ inline static void mapIPv4ToIPv6(UInt32 addr, uint8_t * buf)
memcpy(&buf[12], &addr, 4);
}
static bool matchIPv4Subnet(UInt32 target, UInt32 addr, UInt8 prefix)
{
UInt32 mask = (prefix >= 32) ? 0xffffffffu : ~(0xffffffffu >> prefix);
return (target & mask) == addr;
}
#if defined(__SSE2__)
#include <emmintrin.h>
static bool matchIPv6Subnet(const uint8_t * target, const uint8_t * addr, UInt8 prefix)
{
uint16_t mask = _mm_movemask_epi8(_mm_cmpeq_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(target)),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(addr))));
mask = ~mask;
if (mask)
{
auto offset = __builtin_ctz(mask);
if (prefix / 8 != offset)
return prefix / 8 < offset;
auto cmpmask = ~(0xff >> (prefix % 8));
return (target[offset] & cmpmask) == addr[offset];
}
return true;
}
# else
static bool matchIPv6Subnet(const uint8_t * target, const uint8_t * addr, UInt8 prefix)
{
if (prefix > IPV6_BINARY_LENGTH * 8U)
prefix = IPV6_BINARY_LENGTH * 8U;
size_t i = 0;
for (; prefix >= 8; ++i, prefix -= 8)
{
if (target[i] != addr[i])
return false;
}
if (prefix == 0)
return true;
auto mask = ~(0xff >> prefix);
return (target[i] & mask) == addr[i];
}
#endif // __SSE2__
IPAddressDictionary::IPAddressDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,

View File

@ -271,7 +271,7 @@ std::vector<IPolygonDictionary::Point> IPolygonDictionary::extractPoints(const C
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PolygonDictionary input point component must not be NaN");
if (isinf(x) || isinf(y))
if (std::isinf(x) || std::isinf(y))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"PolygonDictionary input point component must not be infinite");

View File

@ -1645,7 +1645,7 @@ private:
static inline void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
__m128i mask = _mm_loadu_si128(reinterpret_cast<const __m128i *>(getCIDRMaskIPv6(bits_to_keep)));
__m128i mask = _mm_loadu_si128(reinterpret_cast<const __m128i *>(getCIDRMaskIPv6(bits_to_keep).data()));
__m128i lower = _mm_and_si128(_mm_loadu_si128(reinterpret_cast<const __m128i *>(src)), mask);
_mm_storeu_si128(reinterpret_cast<__m128i *>(dst_lower), lower);
@ -1659,7 +1659,7 @@ private:
/// NOTE IPv6 is stored in memory in big endian format that makes some difficulties.
static void applyCIDRMask(const UInt8 * __restrict src, UInt8 * __restrict dst_lower, UInt8 * __restrict dst_upper, UInt8 bits_to_keep)
{
const auto * mask = getCIDRMaskIPv6(bits_to_keep);
const auto & mask = getCIDRMaskIPv6(bits_to_keep);
for (size_t i = 0; i < 16; ++i)
{

View File

@ -76,7 +76,7 @@ struct ColumnToPointsConverter
if (isNaN(first) || isNaN(second))
throw Exception("Point's component must not be NaN", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (isinf(first) || isinf(second))
if (std::isinf(first) || std::isinf(second))
throw Exception("Point's component must not be infinite", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
answer[i] = Point(first, second);

View File

@ -0,0 +1,259 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/IPv6ToBinary.h>
#include <Common/formatIPv6.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <variant>
#include <charconv>
#include <common/logger_useful.h>
namespace DB::ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
class IPAddressVariant
{
public:
explicit IPAddressVariant(const StringRef & address_str)
{
/// IP address parser functions require that the input is
/// NULL-terminated so we need to copy it.
const auto address_str_copy = std::string(address_str);
UInt32 v4;
if (DB::parseIPv4(address_str_copy.c_str(), reinterpret_cast<unsigned char *>(&v4)))
{
addr = v4;
}
else
{
addr = IPv6AddrType();
bool success = DB::parseIPv6(address_str_copy.c_str(), std::get<IPv6AddrType>(addr).data());
if (!success)
throw DB::Exception("Neither IPv4 nor IPv6 address: '" + address_str_copy + "'",
DB::ErrorCodes::CANNOT_PARSE_TEXT);
}
}
UInt32 asV4() const
{
if (const auto * val = std::get_if<IPv4AddrType>(&addr))
return *val;
return 0;
}
const uint8_t * asV6() const
{
if (const auto * val = std::get_if<IPv6AddrType>(&addr))
return val->data();
return nullptr;
}
private:
using IPv4AddrType = UInt32;
using IPv6AddrType = std::array<uint8_t, IPV6_BINARY_LENGTH>;
std::variant<IPv4AddrType, IPv6AddrType> addr;
};
struct IPAddressCIDR
{
IPAddressVariant address;
UInt8 prefix;
};
IPAddressCIDR parseIPWithCIDR(const StringRef cidr_str)
{
std::string_view cidr_str_view(cidr_str);
size_t pos_slash = cidr_str_view.find('/');
if (pos_slash == 0)
throw DB::Exception("Error parsing IP address with prefix: " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
if (pos_slash == std::string_view::npos)
throw DB::Exception("The text does not contain '/': " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
std::string_view addr_str = cidr_str_view.substr(0, pos_slash);
IPAddressVariant addr(StringRef{addr_str.data(), addr_str.size()});
uint8_t prefix = 0;
auto prefix_str = cidr_str_view.substr(pos_slash+1);
const auto * prefix_str_end = prefix_str.data() + prefix_str.size();
auto [parse_end, parse_error] = std::from_chars(prefix_str.data(), prefix_str_end, prefix);
uint8_t max_prefix = (addr.asV6() ? IPV6_BINARY_LENGTH : IPV4_BINARY_LENGTH) * 8;
bool has_error = parse_error != std::errc() || parse_end != prefix_str_end || prefix > max_prefix;
if (has_error)
throw DB::Exception("The CIDR has a malformed prefix bits: " + std::string(cidr_str), DB::ErrorCodes::CANNOT_PARSE_TEXT);
return {addr, static_cast<UInt8>(prefix)};
}
inline bool isAddressInRange(const IPAddressVariant & address, const IPAddressCIDR & cidr)
{
if (const auto * cidr_v6 = cidr.address.asV6())
{
if (const auto * addr_v6 = address.asV6())
return DB::matchIPv6Subnet(addr_v6, cidr_v6, cidr.prefix);
}
else
{
if (!address.asV6())
return DB::matchIPv4Subnet(address.asV4(), cidr.address.asV4(), cidr.prefix);
}
return false;
}
}
namespace DB
{
class FunctionIsIPAddressContainedIn : public IFunction
{
public:
static constexpr auto name = "isIPAddressInRange";
String getName() const override { return name; }
static FunctionPtr create(const Context &) { return std::make_shared<FunctionIsIPAddressContainedIn>(); }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* return_type */, size_t input_rows_count) const override
{
const IColumn * col_addr = arguments[0].column.get();
const IColumn * col_cidr = arguments[1].column.get();
if (const auto * col_addr_const = checkAndGetAnyColumnConst(col_addr))
{
if (const auto * col_cidr_const = checkAndGetAnyColumnConst(col_cidr))
return executeImpl(*col_addr_const, *col_cidr_const, input_rows_count);
else
return executeImpl(*col_addr_const, *col_cidr, input_rows_count);
}
else
{
if (const auto * col_cidr_const = checkAndGetAnyColumnConst(col_cidr))
return executeImpl(*col_addr, *col_cidr_const, input_rows_count);
else
return executeImpl(*col_addr, *col_cidr, input_rows_count);
}
}
virtual DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() != 2)
throw Exception(
"Number of arguments for function " + getName() + " doesn't match: passed " + toString(arguments.size()) + ", should be 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypePtr & addr_type = arguments[0];
const DataTypePtr & prefix_type = arguments[1];
if (!isString(addr_type) || !isString(prefix_type))
throw Exception("The arguments of function " + getName() + " must be String",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt8>();
}
virtual size_t getNumberOfArguments() const override { return 2; }
bool useDefaultImplementationForNulls() const override { return false; }
private:
/// Like checkAndGetColumnConst() but this function doesn't
/// care about the type of data column.
static const ColumnConst * checkAndGetAnyColumnConst(const IColumn * column)
{
if (!column || !isColumnConst(*column))
return nullptr;
return assert_cast<const ColumnConst *>(column);
}
/// Both columns are constant.
static ColumnPtr executeImpl(
const ColumnConst & col_addr_const,
const ColumnConst & col_cidr_const,
size_t input_rows_count)
{
const auto & col_addr = col_addr_const.getDataColumn();
const auto & col_cidr = col_cidr_const.getDataColumn();
const auto addr = IPAddressVariant(col_addr.getDataAt(0));
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(1);
ColumnUInt8::Container & vec_res = col_res->getData();
vec_res[0] = isAddressInRange(addr, cidr) ? 1 : 0;
return ColumnConst::create(std::move(col_res), input_rows_count);
}
/// Address is constant.
static ColumnPtr executeImpl(const ColumnConst & col_addr_const, const IColumn & col_cidr, size_t input_rows_count)
{
const auto & col_addr = col_addr_const.getDataColumn();
const auto addr = IPAddressVariant(col_addr.getDataAt (0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
/// CIDR is constant.
static ColumnPtr executeImpl(const IColumn & col_addr, const ColumnConst & col_cidr_const, size_t input_rows_count)
{
const auto & col_cidr = col_cidr_const.getDataColumn();
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(0));
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto addr = IPAddressVariant(col_addr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
/// Neither are constant.
static ColumnPtr executeImpl(const IColumn & col_addr, const IColumn & col_cidr, size_t input_rows_count)
{
ColumnUInt8::MutablePtr col_res = ColumnUInt8::create(input_rows_count);
ColumnUInt8::Container & vec_res = col_res->getData();
for (size_t i = 0; i < input_rows_count; i++)
{
const auto addr = IPAddressVariant(col_addr.getDataAt(i));
const auto cidr = parseIPWithCIDR(col_cidr.getDataAt(i));
vec_res[i] = isAddressInRange(addr, cidr) ? 1 : 0;
}
return col_res;
}
};
void registerFunctionIsIPAddressContainedIn(FunctionFactory & factory)
{
factory.registerFunction<FunctionIsIPAddressContainedIn>();
}
}

View File

@ -71,6 +71,7 @@ void registerFunctionTcpPort(FunctionFactory &);
void registerFunctionByteSize(FunctionFactory &);
void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionID(FunctionFactory & factory);
void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -142,6 +143,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionByteSize(factory);
registerFunctionFile(factory);
registerFunctionConnectionID(factory);
registerFunctionIsIPAddressContainedIn(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -309,6 +309,7 @@ SRCS(
isConstant.cpp
isDecimalOverflow.cpp
isFinite.cpp
isIPAddressContainedIn.cpp
isInfinite.cpp
isNaN.cpp
isNotNull.cpp

View File

@ -7,6 +7,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/MySQL/ASTAlterCommand.h>
#include <Parsers/MySQL/ASTDeclareColumn.h>
@ -411,13 +412,26 @@ ASTs InterpreterCreateImpl::getRewrittenQueries(
return column_declaration;
};
/// Add _sign and _version column.
/// Add _sign and _version columns.
String sign_column_name = getUniqueColumnName(columns_name_and_type, "_sign");
String version_column_name = getUniqueColumnName(columns_name_and_type, "_version");
columns->set(columns->columns, InterpreterCreateQuery::formatColumns(columns_name_and_type));
columns->columns->children.emplace_back(create_materialized_column_declaration(sign_column_name, "Int8", UInt64(1)));
columns->columns->children.emplace_back(create_materialized_column_declaration(version_column_name, "UInt64", UInt64(1)));
/// Add minmax skipping index for _version column.
auto version_index = std::make_shared<ASTIndexDeclaration>();
version_index->name = version_column_name;
auto index_expr = std::make_shared<ASTIdentifier>(version_column_name);
auto index_type = makeASTFunction("minmax");
index_type->no_empty_args = true;
version_index->set(version_index->expr, index_expr);
version_index->set(version_index->type, index_type);
version_index->granularity = 1;
ASTPtr indices = std::make_shared<ASTExpressionList>();
indices->children.push_back(version_index);
columns->set(columns->indices, indices);
auto storage = std::make_shared<ASTStorage>();
/// The `partition by` expression must use primary keys, otherwise the primary keys will not be merge.

View File

@ -28,6 +28,10 @@ static inline ASTPtr tryRewrittenCreateQuery(const String & query, const Context
context, "test_database", "test_database")[0];
}
static const char MATERIALIZEMYSQL_TABLE_COLUMNS[] = ", `_sign` Int8() MATERIALIZED 1"
", `_version` UInt64() MATERIALIZED 1"
", INDEX _version _version TYPE minmax GRANULARITY 1";
TEST(MySQLCreateRewritten, ColumnsDataType)
{
tryRegisterFunctions();
@ -45,46 +49,46 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + ")", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` " + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
if (Poco::toUpper(test_type).find("INT") != std::string::npos)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")"
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(U" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " NOT NULL UNSIGNED)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, test " + test_type + " COMMENT 'test_comment' UNSIGNED NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` U" + mapped_type +
", `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
MATERIALIZEMYSQL_TABLE_COLUMNS + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}
}
@ -109,13 +113,15 @@ TEST(MySQLCreateRewritten, PartitionPolicy)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
}
}
@ -138,23 +144,27 @@ TEST(MySQLCreateRewritten, OrderbyPolicy)
{
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " PRIMARY KEY, `key2` " + test_type + " UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` Nullable(" + mapped_type + "), `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, assumeNotNull(key2))");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` Nullable(" + mapped_type + ")" +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, assumeNotNull(key2))");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " NOT NULL PRIMARY KEY, `key2` " + test_type + " NOT NULL UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + " KEY UNIQUE KEY)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` " + test_type + ", `key2` " + test_type + " UNIQUE KEY, PRIMARY KEY(`key`, `key2`))", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type + ", `_sign` Int8() MATERIALIZED 1, "
"`_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
"CREATE TABLE test_database.test_table_1 (`key` " + mapped_type + ", `key2` " + mapped_type +
MATERIALIZEMYSQL_TABLE_COLUMNS +
") ENGINE = ReplacingMergeTree(_version)" + partition_policy + " ORDER BY (key, key2)");
}
}
@ -165,23 +175,27 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrimaryKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL, PRIMARY KEY (`key`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
"CREATE TABLE test_database.test_table_1 (`key` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` int NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key_1` BIGINT NOT NULL, key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
}
TEST(MySQLCreateRewritten, RewrittenQueryWithPrefixKey)
@ -191,7 +205,8 @@ TEST(MySQLCreateRewritten, RewrittenQueryWithPrefixKey)
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY KEY, `prefix_key` varchar(200) NOT NULL, KEY prefix_key_index(prefix_key(2))) ENGINE=InnoDB DEFAULT CHARSET=utf8", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `prefix_key` String, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1) ENGINE = "
"CREATE TABLE test_database.test_table_1 (`key` Int32, `prefix_key` String" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) + ") ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY (key, prefix_key)");
}
@ -204,6 +219,7 @@ TEST(MySQLCreateRewritten, UniqueKeysConvert)
"CREATE TABLE `test_database`.`test_table_1` (code varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,name varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,"
" id bigint NOT NULL AUTO_INCREMENT, tenant_id bigint NOT NULL, PRIMARY KEY (id), UNIQUE KEY code_id (code, tenant_id), UNIQUE KEY name_id (name, tenant_id))"
" ENGINE=InnoDB AUTO_INCREMENT=100 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`code` String, `name` String, `id` Int64, `tenant_id` Int64, `_sign` Int8() MATERIALIZED 1, `_version` UInt64() MATERIALIZED 1)"
" ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 18446744073709551) ORDER BY (code, name, tenant_id, id)");
"CREATE TABLE test_database.test_table_1 (`code` String, `name` String, `id` Int64, `tenant_id` Int64" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(id, 18446744073709551) ORDER BY (code, name, tenant_id, id)");
}

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "PostgreSQLConnectionPool.h"
#include <mutex>
namespace DB

View File

@ -1663,9 +1663,10 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
if (source_part->name != source_part_name)
{
throw Exception("Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". This is a bug.",
ErrorCodes::LOGICAL_ERROR);
LOG_WARNING(log, "Part " + source_part_name + " is covered by " + source_part->name
+ " but should be mutated to " + entry.new_part_name + ". "
+ "Possibly the mutation of this part is not needed and will be skipped. This shouldn't happen often.");
return false;
}
/// TODO - some better heuristic?

View File

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import shutil
import sys
import os
import os.path
@ -112,13 +113,14 @@ def get_db_engine(args, database_name):
return " ENGINE=" + args.db_engine
return "" # Will use default engine
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file):
def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir):
# print(client_options)
start_time = datetime.now()
if args.database:
database = args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
else:
# If --database is not specified, we will create temporary database with unique name
@ -136,6 +138,12 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
# because .sh tests also use it for temporary files and we want to avoid
# collisions.
test_tmp_dir = os.path.join(suite_tmp_dir, database)
os.mkdir(test_tmp_dir)
os.environ.setdefault("CLICKHOUSE_TMP", test_tmp_dir)
# This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = case_file
@ -170,7 +178,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
if need_drop_database:
clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10)
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try:
drop_database_query = "DROP DATABASE " + database
if args.replicated_database:
@ -188,6 +196,8 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std
total_time = (datetime.now() - start_time).total_seconds()
return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time
shutil.rmtree(test_tmp_dir)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
@ -217,7 +227,7 @@ def get_processlist(args):
query = b"SELECT materialize((hostName(), tcpPort())) as host, * " \
b"FROM clusterAllReplicas('r', system.processes) WHERE query NOT LIKE '%system.processes%' FORMAT Vertical"
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE)
(stdout, _) = clickhouse_proc.communicate(query, timeout=10)
(stdout, _) = clickhouse_proc.communicate((b"SHOW PROCESSLIST FORMAT Vertical"), timeout=20)
return False, stdout.decode('utf-8')
except Exception as ex:
print("Exception", ex)
@ -362,7 +372,7 @@ def run_tests_array(all_tests_with_params):
clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True)
failed_to_check = False
try:
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=10)
clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=20)
except:
failed_to_check = True
@ -377,7 +387,7 @@ def run_tests_array(all_tests_with_params):
stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr'
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir)
if proc.returncode is None:
try:
@ -395,7 +405,7 @@ def run_tests_array(all_tests_with_params):
else:
counter = 1
while proc.returncode != 0 and need_retry(stderr):
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file)
proc, stdout, stderr, total_time = run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file, suite_tmp_dir)
sleep(2**counter)
counter += 1
if counter > 6:
@ -458,7 +468,7 @@ def run_tests_array(all_tests_with_params):
failures_chain += 1
status += MSG_FAIL
status += print_test_time(total_time)
status += " - Long test not marked as 'long'"
status += " - Test runs too long (> 30s). Make it faster."
else:
passed_total += 1
failures_chain = 0
@ -659,7 +669,6 @@ def main(args):
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
os.environ.setdefault("CLICKHOUSE_TMP", tmp_dir)
# Force to print server warnings in stderr
# Shell scripts could change logging level

View File

@ -9,7 +9,6 @@ fun:tolower
# Ideally, we should report these upstream.
src:*/contrib/zlib-ng/*
src:*/contrib/simdjson/*
src:*/contrib/lz4/*
# Hyperscan
fun:roseRunProgram

View File

@ -11,7 +11,7 @@
<fill_query>INSERT INTO hits_none SELECT WatchID FROM test.hits</fill_query>
<fill_query>OPTIMIZE TABLE hits_none FINAL</fill_query>
<query><![CDATA[SELECT sum(WatchID) FROM hits_none]]></query>
<query short="1"><![CDATA[SELECT sum(WatchID) FROM hits_none]]></query>
<drop_query>DROP TABLE hits_none</drop_query>
</test>

View File

@ -48,7 +48,7 @@ SELECT
threads_realtime >= threads_time_user_system_io,
any(length(thread_ids)) >= 1
FROM
(SELECT * FROM system.query_log PREWHERE query='$heavy_cpu_query' WHERE event_date >= today()-1 AND current_database = currentDatabase() AND type=2 ORDER BY event_time DESC LIMIT 1)
(SELECT * FROM system.query_log PREWHERE query='$heavy_cpu_query' WHERE event_date >= today()-2 AND current_database = currentDatabase() AND type=2 ORDER BY event_time DESC LIMIT 1)
ARRAY JOIN ProfileEvents.Names AS PN, ProfileEvents.Values AS PV"
# Clean

View File

@ -97,7 +97,7 @@ echo 7
# and finally querylog
$CLICKHOUSE_CLIENT \
--server_logs_file=/dev/null \
--query="select * from system.query_log where current_database = currentDatabase() AND event_time > now() - 10 and query like '%TOPSECRET%';"
--query="select * from system.query_log where current_database = currentDatabase() AND event_date >= yesterday() and query like '%TOPSECRET%';"
rm -f "$tmp_file" >/dev/null 2>&1

View File

@ -2,26 +2,33 @@ set log_queries=1;
select '01231_log_queries_min_type/QUERY_START';
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/QUERY_START%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute;
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/QUERY_START%'
and event_date >= yesterday();
set log_queries_min_type='EXCEPTION_BEFORE_START';
select '01231_log_queries_min_type/EXCEPTION_BEFORE_START';
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/EXCEPTION_BEFORE_START%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute;
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/EXCEPTION_BEFORE_START%'
and event_date >= yesterday();
set max_rows_to_read='100K';
set log_queries_min_type='EXCEPTION_WHILE_PROCESSING';
select '01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING', max(number) from system.numbers limit 1e6; -- { serverError 158; }
set max_rows_to_read=0;
system flush logs;
select count() from system.query_log where current_database = currentDatabase() and query like '%01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING%' and query not like '%system.query_log%' and event_date = today() and event_time >= now() - interval 1 minute and type = 'ExceptionWhileProcessing';
select count() from system.query_log where current_database = currentDatabase()
and query like 'select \'01231_log_queries_min_type/EXCEPTION_WHILE_PROCESSING%'
and event_date >= yesterday() and type = 'ExceptionWhileProcessing';
set max_rows_to_read='100K';
select '01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING', max(number) from system.numbers limit 1e6; -- { serverError 158; }
system flush logs;
set max_rows_to_read=0;
select count() from system.query_log where
current_database = currentDatabase() and
query like '%01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING%' and
query not like '%system.query_log%' and
event_date = today() and
event_time >= now() - interval 1 minute and
query like 'select \'01231_log_queries_min_type w/ Settings/EXCEPTION_WHILE_PROCESSING%' and
event_date >= yesterday() and
type = 'ExceptionWhileProcessing' and
has(Settings.Names, 'max_rows_to_read');

View File

@ -9,12 +9,12 @@ $CLICKHOUSE_CLIENT --multiquery --query "
CREATE TABLE bug (UserID UInt64, Date Date) ENGINE = MergeTree ORDER BY Date;
INSERT INTO bug SELECT rand64(), '2020-06-07' FROM numbers(50000000);
OPTIMIZE TABLE bug FINAL;"
LOG="$CLICKHOUSE_TMP/err-$CLICKHOUSE_DATABASE"
$CLICKHOUSE_BENCHMARK --iterations 10 --max_threads 100 --min_bytes_to_use_direct_io 1 <<< "SELECT sum(UserID) FROM bug PREWHERE NOT ignore(Date)" 1>/dev/null 2>"$LOG"
cat "$LOG" | grep Exception
cat "$LOG" | grep Loaded
$CLICKHOUSE_BENCHMARK --iterations 10 --max_threads 100 --min_bytes_to_use_direct_io 1 <<< "SELECT sum(UserID) FROM bug PREWHERE NOT ignore(Date)" 1>/dev/null 2>"$CLICKHOUSE_TMP"/err
cat "$CLICKHOUSE_TMP"/err | grep Exception
cat "$CLICKHOUSE_TMP"/err | grep Loaded
rm "$CLICKHOUSE_TMP"/err
rm "$LOG"
$CLICKHOUSE_CLIENT --multiquery --query "
DROP TABLE bug;"

View File

@ -6,6 +6,6 @@ SET min_bytes_to_use_mmap_io = 1;
SELECT * FROM test_01344 WHERE x = 'Hello, world';
SYSTEM FLUSH LOGS;
SELECT PE.Values FROM system.query_log ARRAY JOIN ProfileEvents AS PE WHERE current_database = currentDatabase() AND event_date >= yesterday() AND event_time >= now() - 300 AND query LIKE 'SELECT * FROM test_01344 WHERE x = ''Hello, world''%' AND PE.Names = 'CreatedReadBufferMMap' AND type = 2 ORDER BY event_time DESC LIMIT 1;
SELECT PE.Values FROM system.query_log ARRAY JOIN ProfileEvents AS PE WHERE current_database = currentDatabase() AND event_date >= yesterday() AND query LIKE 'SELECT * FROM test_01344 WHERE x = ''Hello, world''%' AND PE.Names = 'CreatedReadBufferMMap' AND type = 2 ORDER BY event_time DESC LIMIT 1;
DROP TABLE test_01344;

View File

@ -17,7 +17,7 @@ CREATE MATERIALIZED VIEW slow_log Engine=Memory AS
extract(query,'/\\*\\s*QUERY_GROUP_ID:(.*?)\\s*\\*/') as QUERY_GROUP_ID,
*
FROM system.query_log
WHERE type<>1 and event_date >= yesterday() and event_time > now() - 120
WHERE type<>1 and event_date >= yesterday()
) as ql
INNER JOIN expected_times USING (QUERY_GROUP_ID)
WHERE query_duration_ms > max_query_duration_ms
@ -38,7 +38,7 @@ SELECT
extract(query,'/\\*\\s*QUERY_GROUP_ID:(.*?)\\s*\\*/') as QUERY_GROUP_ID,
count()
FROM system.query_log
WHERE current_database = currentDatabase() AND type<>1 and event_date >= yesterday() and event_time > now() - 20 and QUERY_GROUP_ID<>''
WHERE current_database = currentDatabase() AND type<>1 and event_date >= yesterday() and QUERY_GROUP_ID<>''
GROUP BY QUERY_GROUP_ID
ORDER BY QUERY_GROUP_ID;

View File

@ -10,7 +10,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT a.size0 FROM %t_arr%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====tuple====';
DROP TABLE IF EXISTS t_tup;
@ -27,7 +27,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT t._ FROM %t_tup%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====nullable====';
DROP TABLE IF EXISTS t_nul;
@ -41,7 +41,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT n.null FROM %t_nul%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
SELECT '====map====';
SET allow_experimental_map_type = 1;
@ -60,7 +60,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT m.% FROM %t_map%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND current_database = currentDatabase();
DROP TABLE t_arr;
DROP TABLE t_nul;

View File

@ -1,20 +1,20 @@
set log_queries=1;
set log_queries_min_type='QUERY_FINISH';
set enable_global_with_statement=1;
set enable_global_with_statement=0;
select /* test=01531, enable_global_with_statement=0 */ 2;
system flush logs;
select count() from system.query_log
where event_time >= now() - interval 5 minute
and query like '%select /* test=01531, enable_global_with_statement=0 */ 2%'
where event_date >= yesterday()
and query like 'select /* test=01531, enable_global_with_statement=0 */ 2%'
and current_database = currentDatabase()
;
set enable_global_with_statement=1;
select /* test=01531 enable_global_with_statement=1 */ 2;
select /* test=01531, enable_global_with_statement=1 */ 2;
system flush logs;
select count() from system.query_log
where event_time >= now() - interval 5 minute
and query like '%select /* test=01531 enable_global_with_statement=1 */ 2%'
where event_date >= yesterday()
and query like 'select /* test=01531, enable_global_with_statement=1 */ 2%'
and current_database = currentDatabase()
;

View File

@ -13,16 +13,16 @@ col3
read files
4
6
0 899984 7199412
1 899987 7199877
2 899990 7200255
3 899993 7199883
4 899996 7199798
5 899999 7200306
6 900002 7200064
7 900005 7199429
8 900008 7200067
9 899992 7199993
0 89982 719752
1 89988 720017
2 89994 720152
3 90000 720157
4 90006 720100
5 90012 720168
6 90018 720106
7 90005 719891
8 89992 719854
9 89979 719706
0 []
0 [0]
1 [0,2]

View File

@ -36,7 +36,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT col1.a FROM %nested%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND event_date >= yesterday() AND current_database = currentDatabase();
SYSTEM DROP MARK CACHE;
SELECT col3.n2.s FROM nested FORMAT Null;
@ -46,7 +46,7 @@ SYSTEM FLUSH LOGS;
SELECT ProfileEvents.Values[indexOf(ProfileEvents.Names, 'FileOpen')]
FROM system.query_log
WHERE (type = 'QueryFinish') AND (lower(query) LIKE lower('SELECT col3.n2.s FROM %nested%'))
AND event_time > now() - INTERVAL 10 SECOND AND current_database = currentDatabase();
AND event_date >= yesterday() AND current_database = currentDatabase();
DROP TABLE nested;
@ -59,7 +59,7 @@ ENGINE = MergeTree
ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO nested SELECT number, arrayMap(x -> (x, arrayMap(y -> (toString(y * x), y + x), range(number % 17))), range(number % 19)) FROM numbers(1000000);
INSERT INTO nested SELECT number, arrayMap(x -> (x, arrayMap(y -> (toString(y * x), y + x), range(number % 17))), range(number % 19)) FROM numbers(100000);
SELECT id % 10, sum(length(col1)), sumArray(arrayMap(x -> length(x), col1.n.b)) FROM nested GROUP BY id % 10;
SELECT arraySum(col1.a), arrayMap(x -> x * x * 2, col1.a) FROM nested ORDER BY id LIMIT 5;

View File

@ -12,19 +12,15 @@ system flush logs;
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-fast%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
select count()
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-fast%'
and query not like '%system.query_thread_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-fast%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
--
-- slow -- query logged
@ -37,18 +33,14 @@ system flush logs;
select count()
from system.query_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-slow%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- There at least two threads involved in a simple query
-- (one thread just waits another, sigh)
select count() == 2
select if(count() == 2, 'OK', 'Fail: ' || toString(count()))
from system.query_thread_log
where
query like '%01546_log_queries_min_query_duration_ms-slow%'
and query not like '%system.query_thread_log%'
query like 'select \'01546_log_queries_min_query_duration_ms-slow%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();

View File

@ -21,17 +21,15 @@ system flush logs;
select count()
from system.query_log
where
query like '%01547_query_log_current_database%'
query like 'select \'01547_query_log_current_database%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- at least two threads for processing
-- (but one just waits for another, sigh)
select count() == 2
from system.query_thread_log
where
query like '%01547_query_log_current_database%'
query like 'select \'01547_query_log_current_database%'
and current_database = currentDatabase()
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday()

View File

@ -13,8 +13,7 @@ where
query like '%01548_query_log_query_execution_ms%'
and current_database = currentDatabase()
and query_duration_ms between 100 and 800
and event_date = today()
and event_time >= now() - interval 1 minute;
and event_date >= yesterday();
-- at least two threads for processing
-- (but one just waits for another, sigh)

View File

@ -4,9 +4,10 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_BENCHMARK --iterations 10 --query "SELECT 1" 1>/dev/null 2>"$CLICKHOUSE_TMP"/err
LOG="$CLICKHOUSE_TMP/err-$CLICKHOUSE_DATABASE"
$CLICKHOUSE_BENCHMARK --iterations 10 --query "SELECT 1" 1>/dev/null 2>"$LOG"
cat "$CLICKHOUSE_TMP"/err | grep Exception
cat "$CLICKHOUSE_TMP"/err | grep Loaded
cat "$LOG" | grep Exception
cat "$LOG" | grep Loaded
rm "$CLICKHOUSE_TMP"/err
rm "$LOG"

View File

@ -8,13 +8,15 @@ CREATE TABLE table_with_single_pk
ENGINE = MergeTree
ORDER BY key;
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(10000000);
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000);
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE "table" = 'table_with_single_pk'
AND "database" = currentDatabase()
ORDER BY event_time DESC
LIMIT 1
) AS time

View File

@ -136,7 +136,7 @@ SELECT 'ACTUAL LOG CONTENT:';
-- Try to filter out all possible previous junk events by excluding old log entries,
SELECT query_kind, query FROM system.query_log
WHERE
log_comment LIKE '%system.query_log%' AND type == 'QueryStart' AND event_time >= now() - 10
log_comment LIKE '%system.query_log%' AND type == 'QueryStart' AND event_date >= yesterday()
AND current_database == currentDatabase()
ORDER BY event_time_microseconds;

View File

@ -0,0 +1,46 @@
# Invocation with constants
1
0
1
0
# Invocation with non-constant addresses
192.168.99.255 192.168.100.0/22 0
192.168.100.1 192.168.100.0/22 1
192.168.103.255 192.168.100.0/22 1
192.168.104.0 192.168.100.0/22 0
::192.168.99.255 ::192.168.100.0/118 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.103.255 ::192.168.100.0/118 1
::192.168.104.0 ::192.168.100.0/118 0
# Invocation with non-constant prefixes
192.168.100.1 192.168.100.0/22 1
192.168.100.1 192.168.100.0/24 1
192.168.100.1 192.168.100.0/32 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.100.1 ::192.168.100.0/120 1
::192.168.100.1 ::192.168.100.0/128 0
# Invocation with non-constants
192.168.100.1 192.168.100.0/22 1
192.168.100.1 192.168.100.0/24 1
192.168.103.255 192.168.100.0/22 1
192.168.103.255 192.168.100.0/24 0
::192.168.100.1 ::192.168.100.0/118 1
::192.168.100.1 ::192.168.100.0/120 1
::192.168.103.255 ::192.168.100.0/118 1
::192.168.103.255 ::192.168.100.0/120 0
# Check with dense table
1
1
1
1
1
1
1
1
# Mismatching IP versions is not an error.
0
0
0
0
# Unparsable arguments
# Wrong argument types

View File

@ -0,0 +1,64 @@
SELECT '# Invocation with constants';
SELECT isIPAddressInRange('127.0.0.1', '127.0.0.0/8');
SELECT isIPAddressInRange('128.0.0.1', '127.0.0.0/8');
SELECT isIPAddressInRange('ffff::1', 'ffff::/16');
SELECT isIPAddressInRange('fffe::1', 'ffff::/16');
SELECT '# Invocation with non-constant addresses';
WITH arrayJoin(['192.168.99.255', '192.168.100.1', '192.168.103.255', '192.168.104.0']) as addr, '192.168.100.0/22' as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH arrayJoin(['::192.168.99.255', '::192.168.100.1', '::192.168.103.255', '::192.168.104.0']) as addr, '::192.168.100.0/118' as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Invocation with non-constant prefixes';
WITH '192.168.100.1' as addr, arrayJoin(['192.168.100.0/22', '192.168.100.0/24', '192.168.100.0/32']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH '::192.168.100.1' as addr, arrayJoin(['::192.168.100.0/118', '::192.168.100.0/120', '::192.168.100.0/128']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Invocation with non-constants';
WITH arrayJoin(['192.168.100.1', '192.168.103.255']) as addr, arrayJoin(['192.168.100.0/22', '192.168.100.0/24']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
WITH arrayJoin(['::192.168.100.1', '::192.168.103.255']) as addr, arrayJoin(['::192.168.100.0/118', '::192.168.100.0/120']) as prefix SELECT addr, prefix, isIPAddressInRange(addr, prefix);
SELECT '# Check with dense table';
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data (cidr String) ENGINE = Memory;
INSERT INTO test_data
SELECT
IPv4NumToString(IPv4CIDRToRange(IPv4StringToNum('255.255.255.255'), toUInt8(number)).1) || '/' || toString(number) AS cidr
FROM system.numbers LIMIT 33;
SELECT sum(isIPAddressInRange('0.0.0.0', cidr)) == 1 FROM test_data;
SELECT sum(isIPAddressInRange('127.0.0.0', cidr)) == 1 FROM test_data;
SELECT sum(isIPAddressInRange('128.0.0.0', cidr)) == 2 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.0', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.1', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.0.0.255', cidr)) == 9 FROM test_data;
SELECT sum(isIPAddressInRange('255.255.255.255', cidr)) == 33 FROM test_data;
SELECT sum(isIPAddressInRange('255.255.255.254', cidr)) == 32 FROM test_data;
DROP TABLE IF EXISTS test_data;
SELECT '# Mismatching IP versions is not an error.';
SELECT isIPAddressInRange('127.0.0.1', 'ffff::/16');
SELECT isIPAddressInRange('127.0.0.1', '::127.0.0.1/128');
SELECT isIPAddressInRange('::1', '127.0.0.0/8');
SELECT isIPAddressInRange('::127.0.0.1', '127.0.0.1/32');
SELECT '# Unparsable arguments';
SELECT isIPAddressInRange('unparsable', '127.0.0.0/8'); -- { serverError 6 }
SELECT isIPAddressInRange('127.0.0.1', 'unparsable'); -- { serverError 6 }
SELECT '# Wrong argument types';
SELECT isIPAddressInRange(100, '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange(NULL, '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange(CAST(NULL, 'Nullable(String)'), '127.0.0.0/8'); -- { serverError 43 }
SELECT isIPAddressInRange('127.0.0.1', 100); -- { serverError 43 }
SELECT isIPAddressInRange(100, NULL); -- { serverError 43 }
WITH arrayJoin([NULL, NULL, NULL, NULL]) AS prefix SELECT isIPAddressInRange([NULL, NULL, 0, 255, 0], prefix); -- { serverError 43 }

View File

@ -423,8 +423,8 @@
"00571_non_exist_database_when_create_materializ_view",
"00575_illegal_column_exception_when_drop_depen_column",
"00599_create_view_with_subquery",
"00604_show_create_database",
"00600_replace_running_query",
"00604_show_create_database",
"00612_http_max_query_size",
"00619_union_highlite",
"00620_optimize_on_nonleader_replica_zookeeper",
@ -475,6 +475,7 @@
"00933_test_fix_extra_seek_on_compressed_cache",
"00933_ttl_replicated_zookeeper",
"00933_ttl_with_default",
"00950_dict_get",
"00955_test_final_mark",
"00976_ttl_with_old_parts",
"00980_merge_alter_settings",
@ -638,8 +639,8 @@
"01530_drop_database_atomic_sync",
"01541_max_memory_usage_for_user_long",
"01542_dictionary_load_exception_race",
"01560_optimize_on_insert_zookeeper",
"01545_system_errors", // looks at the difference of values in system.errors
"01560_optimize_on_insert_zookeeper",
"01575_disable_detach_table_of_dictionary",
"01593_concurrent_alter_mutations_kill",
"01593_concurrent_alter_mutations_kill_many_replicas",
@ -653,11 +654,23 @@
"01666_blns",
"01646_system_restart_replicas_smoke", // system restart replicas is a global query
"01656_test_query_log_factories_info",
"01658_read_file_to_stringcolumn",
"01669_columns_declaration_serde",
"01676_dictget_in_default_expression",
"01681_cache_dictionary_simple_key",
"01682_cache_dictionary_complex_key",
"01683_flat_dictionary",
"01684_ssd_cache_dictionary_simple_key",
"01685_ssd_cache_dictionary_complex_key",
"01700_system_zookeeper_path_in",
"01702_system_query_log", // It's ok to execute in parallel with oter tests but not several instances of the same test.
"01702_system_query_log", // Runs many global system queries
"01715_background_checker_blather_zookeeper",
"01721_engine_file_truncate_on_insert", // It's ok to execute in parallel but not several instances of the same test.
"01747_alter_partition_key_enum_zookeeper",
"01748_dictionary_table_dot", // creates database
"01760_polygon_dictionaries",
"01760_system_dictionaries",
"01761_alter_decimal_zookeeper",
"attach",
"ddl_dictionaries",
@ -666,18 +679,6 @@
"live_view",
"memory_leak",
"memory_limit",
"polygon_dicts", // they use an explicitly specified database
"01658_read_file_to_stringcolumn",
"01721_engine_file_truncate_on_insert", // It's ok to execute in parallel but not several instances of the same test.
"01702_system_query_log", // It's ok to execute in parallel with oter tests but not several instances of the same test.
"01748_dictionary_table_dot", // creates database
"00950_dict_get",
"01683_flat_dictionary",
"01681_cache_dictionary_simple_key",
"01682_cache_dictionary_complex_key",
"01684_ssd_cache_dictionary_simple_key",
"01685_ssd_cache_dictionary_complex_key",
"01760_system_dictionaries",
"01760_polygon_dictionaries"
"polygon_dicts" // they use an explicitly specified database
]
}