Merge remote-tracking branch 'origin/master' into revive-sqlancer

This commit is contained in:
Yatsishin Ilya 2022-10-28 07:21:39 +00:00
commit 3ab23ef3f2
72 changed files with 1665 additions and 992 deletions

View File

@ -88,7 +88,6 @@
* Allow to use `Date32` arguments for `dateName` function. [#42554](https://github.com/ClickHouse/ClickHouse/pull/42554) ([Roman Vasin](https://github.com/rvasin)).
* Now filters with NULL literals will be used during index analysis. [#34063](https://github.com/ClickHouse/ClickHouse/issues/34063). [#41842](https://github.com/ClickHouse/ClickHouse/pull/41842) ([Amos Bird](https://github.com/amosbird)).
* Merge parts if every part in the range is older than a certain threshold. The threshold can be set by using `min_age_to_force_merge_seconds`. This closes [#35836](https://github.com/ClickHouse/ClickHouse/issues/35836). [#42423](https://github.com/ClickHouse/ClickHouse/pull/42423) ([Antonio Andelic](https://github.com/antonio2368)). This is continuation of [#39550i](https://github.com/ClickHouse/ClickHouse/pull/39550) by [@fastio](https://github.com/fastio) who implemented most of the logic.
* Added new infrastructure for query analysis and planning under `allow_experimental_analyzer` setting. [#31796](https://github.com/ClickHouse/ClickHouse/pull/31796) ([Maksim Kita](https://github.com/kitaisreal)).
* Improve the time to recover lost keeper connections. [#42541](https://github.com/ClickHouse/ClickHouse/pull/42541) ([Raúl Marín](https://github.com/Algunenano)).
#### Build/Testing/Packaging Improvement
@ -143,7 +142,6 @@
* Fix bad_cast assert during INSERT into `Annoy` indexes over non-Float32 columns. `Annoy` indices is an experimental feature. [#42485](https://github.com/ClickHouse/ClickHouse/pull/42485) ([Robert Schulze](https://github.com/rschu1ze)).
* Arithmetic operator with Date or DateTime and 128 or 256-bit integer was referencing uninitialized memory. [#42453](https://github.com/ClickHouse/ClickHouse/issues/42453). [#42573](https://github.com/ClickHouse/ClickHouse/pull/42573) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix unexpected table loading error when partition key contains alias function names during server upgrade. [#36379](https://github.com/ClickHouse/ClickHouse/pull/36379) ([Amos Bird](https://github.com/amosbird)).
* Fixes a crash in `JSONExtract` with `LowCardinality`. [#42633](https://github.com/ClickHouse/ClickHouse/pull/42633) ([Anton Popov](https://github.com/CurtizJ)).
### <a id="229"></a> ClickHouse release 22.9, 2022-09-22

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 172b2ae074f6755145b91c53a95c8540c1468239
Subproject commit 4db7f838afd3139eb3761694b04d31275df45d2d

View File

@ -25,6 +25,7 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/ios.cpp"
"${LIBCXX_SOURCE_DIR}/src/ios.instantiations.cpp"
"${LIBCXX_SOURCE_DIR}/src/iostream.cpp"
"${LIBCXX_SOURCE_DIR}/src/legacy_debug_handler.cpp"
"${LIBCXX_SOURCE_DIR}/src/legacy_pointer_safety.cpp"
"${LIBCXX_SOURCE_DIR}/src/locale.cpp"
"${LIBCXX_SOURCE_DIR}/src/memory.cpp"
@ -49,6 +50,7 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/valarray.cpp"
"${LIBCXX_SOURCE_DIR}/src/variant.cpp"
"${LIBCXX_SOURCE_DIR}/src/vector.cpp"
"${LIBCXX_SOURCE_DIR}/src/verbose_abort.cpp"
)
add_library(cxx ${SRCS})

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit 6eb7cc7a7bdd779e6734d1b9fb451df2274462d7
Subproject commit a736a6b3c6a7b8aae2ebad629ca21b2c55b4820e

View File

@ -9,6 +9,7 @@ set(SRCS
"${LIBCXXABI_SOURCE_DIR}/src/cxa_exception_storage.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp"
# "${LIBCXXABI_SOURCE_DIR}/src/cxa_noexception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_vector.cpp"

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit e7c2b2f7bcf3b4b33892a1a6d25c32a93edfbdb9
Subproject commit 2c8998e26c6d46b27c710d7829c3a15e34959f70

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit bffad6f6fe74d6a2f92e2668390664a926c68733
Subproject commit 50f0eae1a411764cd6d1e85b3ce471438acd3c1c

View File

@ -27,9 +27,14 @@ RUN apt-get update \
tar \
tzdata \
unixodbc \
python3-pip \
libcurl4-openssl-dev \
libssl-dev \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/*
RUN pip3 install pycurl
# Architecture of the image when BuildKit/buildx is used
ARG TARGETARCH

View File

@ -0,0 +1,13 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.8.8.3-lts (ac5a6cababc) FIXME as compared to v22.8.7.34-lts (3c38e5e8ab9)
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#42677](https://github.com/ClickHouse/ClickHouse/issues/42677): keeper-fix: fix race in accessing logs while snapshot is being installed. [#40627](https://github.com/ClickHouse/ClickHouse/pull/40627) ([Antonio Andelic](https://github.com/antonio2368)).

View File

@ -87,14 +87,15 @@ SETTINGS
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше.
:::note "Attention"
Не используйте этот метод в новых проектах. По возможности переключите старые проекты на метод, описанный выше.
:::
``` sql
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_skip_broken_messages])
```
:::
</details>
## Описание {#opisanie}

View File

@ -39,9 +39,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(

View File

@ -43,9 +43,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
@ -59,7 +60,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
- `sign` — Имя столбца с типом строки: `1` — строка состояния, `-1` — строка отмены состояния.
Тип данных столбца — `Int8`.
Тип данных столбца — `Int8`.
</details>

View File

@ -55,9 +55,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(

View File

@ -115,9 +115,10 @@ ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDa
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ, описанный выше.
:::
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ, описанный выше.
:::
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(

View File

@ -42,9 +42,10 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
<summary>Устаревший способ создания таблицы</summary>
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
:::note "Attention"
Не используйте этот способ в новых проектах и по возможности переведите старые проекты на способ описанный выше.
:::
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(

View File

@ -316,9 +316,9 @@ SELECT toStartOfISOYear(toDate('2017-01-01')) AS ISOYear20170101;
Возвращается дата.
:::note "Attention"
Возвращаемое значение для некорректных дат зависит от реализации. ClickHouse может вернуть нулевую дату, выбросить исключение, или выполнить «естественное» перетекание дат между месяцами.
Возвращаемое значение для некорректных дат зависит от реализации. ClickHouse может вернуть нулевую дату, выбросить исключение, или выполнить «естественное» перетекание дат между месяцами.
:::
## toMonday {#tomonday}
Округляет дату или дату-с-временем вниз до ближайшего понедельника.

View File

@ -122,9 +122,9 @@ FROM t_null
Существует два варианта IN-ов с подзапросами (аналогично для JOIN-ов): обычный `IN` / `JOIN` и `GLOBAL IN` / `GLOBAL JOIN`. Они отличаются способом выполнения при распределённой обработке запроса.
:::note "Attention"
Помните, что алгоритмы, описанные ниже, могут работать иначе в зависимости от [настройки](../../operations/settings/settings.md) `distributed_product_mode`.
:::
:::note "Attention"
Помните, что алгоритмы, описанные ниже, могут работать иначе в зависимости от [настройки](../../operations/settings/settings.md) `distributed_product_mode`.
:::
При использовании обычного IN-а, запрос отправляется на удалённые серверы, и на каждом из них выполняются подзапросы в секциях `IN` / `JOIN`.
При использовании `GLOBAL IN` / `GLOBAL JOIN-а`, сначала выполняются все подзапросы для `GLOBAL IN` / `GLOBAL JOIN-ов`, и результаты складываются во временные таблицы. Затем эти временные таблицы передаются на каждый удалённый сервер, и на них выполняются запросы, с использованием этих переданных временных данных.

View File

@ -1,6 +1,10 @@
#pragma once
#include <Interpreters/Cluster.h>
#include <base/types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <utility>
namespace DB
{
@ -8,21 +12,4 @@ namespace DB
using DatabaseAndTableName = std::pair<String, String>;
using ListOfDatabasesAndTableNames = std::vector<DatabaseAndTableName>;
/// Hierarchical description of the tasks
struct ShardPartitionPiece;
struct ShardPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using PartitionPieces = std::vector<ShardPartitionPiece>;
using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
}

View File

@ -1,7 +1,13 @@
set(CLICKHOUSE_COPIER_SOURCES
"${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopierApp.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/Internals.cpp")
"${CMAKE_CURRENT_SOURCE_DIR}/Internals.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/ShardPartition.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/ShardPartitionPiece.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/StatusAccumulator.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/TaskCluster.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/TaskShard.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/TaskTable.cpp")
set (CLICKHOUSE_COPIER_LINK
PRIVATE

View File

@ -3,7 +3,8 @@
#include "Aliases.h"
#include "Internals.h"
#include "TaskCluster.h"
#include "TaskTableAndShard.h"
#include "TaskShard.h"
#include "TaskTable.h"
#include "ShardPartition.h"
#include "ShardPartitionPiece.h"
#include "ZooKeeperStaff.h"

View File

@ -1,17 +1,22 @@
#pragma once
#include "Aliases.h"
#include <base/types.h>
#include <map>
namespace DB
{
/// Contains info about all shards that contain a partition
struct ClusterPartition
{
double elapsed_time_seconds = 0;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
UInt64 blocks_copied = 0;
UInt64 total_tries = 0;
};
/// Contains info about all shards that contain a partition
struct ClusterPartition
{
double elapsed_time_seconds = 0;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
UInt64 blocks_copied = 0;
UInt64 total_tries = 0;
};
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<>>;
}

View File

@ -0,0 +1,70 @@
#include "ShardPartition.h"
#include "TaskShard.h"
#include "TaskTable.h"
namespace DB
{
ShardPartition::ShardPartition(TaskShard & parent, String name_quoted_, size_t number_of_splits)
: task_shard(parent)
, name(std::move(name_quoted_))
{
pieces.reserve(number_of_splits);
}
String ShardPartition::getPartitionCleanStartPath() const
{
return getPartitionPath() + "/clean_start";
}
String ShardPartition::getPartitionPieceCleanStartPath(size_t current_piece_number) const
{
assert(current_piece_number < task_shard.task_table.number_of_splits);
return getPartitionPiecePath(current_piece_number) + "/clean_start";
}
String ShardPartition::getPartitionPath() const
{
return task_shard.task_table.getPartitionPath(name);
}
String ShardPartition::getPartitionPiecePath(size_t current_piece_number) const
{
assert(current_piece_number < task_shard.task_table.number_of_splits);
return task_shard.task_table.getPartitionPiecePath(name, current_piece_number);
}
String ShardPartition::getShardStatusPath() const
{
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
// e.g. /root/table_test.hits/201701/shards/1
return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
}
String ShardPartition::getPartitionShardsPath() const
{
return getPartitionPath() + "/shards";
}
String ShardPartition::getPartitionActiveWorkersPath() const
{
return getPartitionPath() + "/partition_active_workers";
}
String ShardPartition::getActiveWorkerPath() const
{
return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
}
String ShardPartition::getCommonPartitionIsDirtyPath() const
{
return getPartitionPath() + "/is_dirty";
}
String ShardPartition::getCommonPartitionIsCleanedPath() const
{
return getCommonPartitionIsDirtyPath() + "/cleaned";
}
}

View File

@ -1,19 +1,23 @@
#pragma once
#include "Aliases.h"
#include "TaskTableAndShard.h"
#include "ShardPartitionPiece.h"
#include <base/types.h>
#include <map>
namespace DB
{
struct TaskShard;
/// Just destination partition of a shard
/// I don't know what this comment means.
/// In short, when we discovered what shards contain currently processing partition,
/// This class describes a partition (name) that is stored on the shard (parent).
struct ShardPartition
{
ShardPartition(TaskShard &parent, String name_quoted_, size_t number_of_splits = 10)
: task_shard(parent), name(std::move(name_quoted_)) { pieces.reserve(number_of_splits); }
ShardPartition(TaskShard &parent, String name_quoted_, size_t number_of_splits = 10);
String getPartitionPath() const;
@ -45,58 +49,6 @@ struct ShardPartition
String name;
};
inline String ShardPartition::getPartitionCleanStartPath() const
{
return getPartitionPath() + "/clean_start";
}
inline String ShardPartition::getPartitionPieceCleanStartPath(size_t current_piece_number) const
{
assert(current_piece_number < task_shard.task_table.number_of_splits);
return getPartitionPiecePath(current_piece_number) + "/clean_start";
}
inline String ShardPartition::getPartitionPath() const
{
return task_shard.task_table.getPartitionPath(name);
}
inline String ShardPartition::getPartitionPiecePath(size_t current_piece_number) const
{
assert(current_piece_number < task_shard.task_table.number_of_splits);
return task_shard.task_table.getPartitionPiecePath(name, current_piece_number);
}
inline String ShardPartition::getShardStatusPath() const
{
// schema: /<root...>/tables/<table>/<partition>/shards/<shard>
// e.g. /root/table_test.hits/201701/shards/1
return getPartitionShardsPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getPartitionShardsPath() const
{
return getPartitionPath() + "/shards";
}
inline String ShardPartition::getPartitionActiveWorkersPath() const
{
return getPartitionPath() + "/partition_active_workers";
}
inline String ShardPartition::getActiveWorkerPath() const
{
return getPartitionActiveWorkersPath() + "/" + toString(task_shard.numberInCluster());
}
inline String ShardPartition::getCommonPartitionIsDirtyPath() const
{
return getPartitionPath() + "/is_dirty";
}
inline String ShardPartition::getCommonPartitionIsCleanedPath() const
{
return getCommonPartitionIsDirtyPath() + "/cleaned";
}
using TasksPartition = std::map<String, ShardPartition, std::greater<>>;
}

View File

@ -0,0 +1,64 @@
#include "ShardPartitionPiece.h"
#include "ShardPartition.h"
#include "TaskShard.h"
#include <IO/WriteHelpers.h>
namespace DB
{
ShardPartitionPiece::ShardPartitionPiece(ShardPartition & parent, size_t current_piece_number_, bool is_present_piece_)
: is_absent_piece(!is_present_piece_)
, current_piece_number(current_piece_number_)
, shard_partition(parent)
{
}
String ShardPartitionPiece::getPartitionPiecePath() const
{
return shard_partition.getPartitionPath() + "/piece_" + toString(current_piece_number);
}
String ShardPartitionPiece::getPartitionPieceCleanStartPath() const
{
return getPartitionPiecePath() + "/clean_start";
}
String ShardPartitionPiece::getPartitionPieceIsDirtyPath() const
{
return getPartitionPiecePath() + "/is_dirty";
}
String ShardPartitionPiece::getPartitionPieceIsCleanedPath() const
{
return getPartitionPieceIsDirtyPath() + "/cleaned";
}
String ShardPartitionPiece::getPartitionPieceActiveWorkersPath() const
{
return getPartitionPiecePath() + "/partition_piece_active_workers";
}
String ShardPartitionPiece::getActiveWorkerPath() const
{
return getPartitionPieceActiveWorkersPath() + "/" + toString(shard_partition.task_shard.numberInCluster());
}
/// On what shards do we have current partition.
String ShardPartitionPiece::getPartitionPieceShardsPath() const
{
return getPartitionPiecePath() + "/shards";
}
String ShardPartitionPiece::getShardStatusPath() const
{
return getPartitionPieceShardsPath() + "/" + toString(shard_partition.task_shard.numberInCluster());
}
String ShardPartitionPiece::getPartitionPieceCleanerPath() const
{
return getPartitionPieceIsDirtyPath() + "/cleaner";
}
}

View File

@ -1,16 +1,15 @@
#pragma once
#include "Internals.h"
#include <base/types.h>
namespace DB
{
struct ShardPartition;
struct ShardPartitionPiece
{
ShardPartitionPiece(ShardPartition &parent, size_t current_piece_number_, bool is_present_piece_)
: is_absent_piece(!is_present_piece_), current_piece_number(current_piece_number_),
shard_partition(parent) {}
ShardPartitionPiece(ShardPartition & parent, size_t current_piece_number_, bool is_present_piece_);
String getPartitionPiecePath() const;
@ -37,52 +36,6 @@ struct ShardPartitionPiece
ShardPartition & shard_partition;
};
inline String ShardPartitionPiece::getPartitionPiecePath() const
{
return shard_partition.getPartitionPath() + "/piece_" + toString(current_piece_number);
}
inline String ShardPartitionPiece::getPartitionPieceCleanStartPath() const
{
return getPartitionPiecePath() + "/clean_start";
}
inline String ShardPartitionPiece::getPartitionPieceIsDirtyPath() const
{
return getPartitionPiecePath() + "/is_dirty";
}
inline String ShardPartitionPiece::getPartitionPieceIsCleanedPath() const
{
return getPartitionPieceIsDirtyPath() + "/cleaned";
}
inline String ShardPartitionPiece::getPartitionPieceActiveWorkersPath() const
{
return getPartitionPiecePath() + "/partition_piece_active_workers";
}
inline String ShardPartitionPiece::getActiveWorkerPath() const
{
return getPartitionPieceActiveWorkersPath() + "/" + toString(shard_partition.task_shard.numberInCluster());
}
/// On what shards do we have current partition.
inline String ShardPartitionPiece::getPartitionPieceShardsPath() const
{
return getPartitionPiecePath() + "/shards";
}
inline String ShardPartitionPiece::getShardStatusPath() const
{
return getPartitionPieceShardsPath() + "/" + toString(shard_partition.task_shard.numberInCluster());
}
inline String ShardPartitionPiece::getPartitionPieceCleanerPath() const
{
return getPartitionPieceIsDirtyPath() + "/cleaner";
}
using PartitionPieces = std::vector<ShardPartitionPiece>;
}

View File

@ -0,0 +1,48 @@
#include "StatusAccumulator.h"
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <iostream>
namespace DB
{
StatusAccumulator::MapPtr StatusAccumulator::fromJSON(String state_json)
{
Poco::JSON::Parser parser;
auto state = parser.parse(state_json).extract<Poco::JSON::Object::Ptr>();
MapPtr result_ptr = std::make_shared<Map>();
for (const auto & table_name : state->getNames())
{
auto table_status_json = state->getValue<String>(table_name);
auto table_status = parser.parse(table_status_json).extract<Poco::JSON::Object::Ptr>();
/// Map entry will be created if it is absent
auto & map_table_status = (*result_ptr)[table_name];
map_table_status.all_partitions_count += table_status->getValue<size_t>("all_partitions_count");
map_table_status.processed_partitions_count += table_status->getValue<size_t>("processed_partitions_count");
}
return result_ptr;
}
String StatusAccumulator::serializeToJSON(MapPtr statuses)
{
Poco::JSON::Object result_json;
for (const auto & [table_name, table_status] : *statuses)
{
Poco::JSON::Object status_json;
status_json.set("all_partitions_count", table_status.all_partitions_count);
status_json.set("processed_partitions_count", table_status.processed_partitions_count);
result_json.set(table_name, status_json);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(result_json, oss);
auto result = oss.str();
return result;
}
}

View File

@ -1,65 +1,27 @@
#pragma once
#include <base/types.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <unordered_map>
#include <memory>
#include <string>
#include <iostream>
#include <unordered_map>
namespace DB
{
class StatusAccumulator
{
public:
struct TableStatus
{
size_t all_partitions_count;
size_t processed_partitions_count;
};
public:
struct TableStatus
{
size_t all_partitions_count;
size_t processed_partitions_count;
};
using Map = std::unordered_map<std::string, TableStatus>;
using MapPtr = std::shared_ptr<Map>;
using Map = std::unordered_map<String, TableStatus>;
using MapPtr = std::shared_ptr<Map>;
static MapPtr fromJSON(std::string state_json)
{
Poco::JSON::Parser parser;
auto state = parser.parse(state_json).extract<Poco::JSON::Object::Ptr>();
MapPtr result_ptr = std::make_shared<Map>();
for (const auto & table_name : state->getNames())
{
auto table_status_json = state->getValue<std::string>(table_name);
auto table_status = parser.parse(table_status_json).extract<Poco::JSON::Object::Ptr>();
/// Map entry will be created if it is absent
auto & map_table_status = (*result_ptr)[table_name];
map_table_status.all_partitions_count += table_status->getValue<size_t>("all_partitions_count");
map_table_status.processed_partitions_count += table_status->getValue<size_t>("processed_partitions_count");
}
return result_ptr;
}
static std::string serializeToJSON(MapPtr statuses)
{
Poco::JSON::Object result_json;
for (const auto & [table_name, table_status] : *statuses)
{
Poco::JSON::Object status_json;
status_json.set("all_partitions_count", table_status.all_partitions_count);
status_json.set("processed_partitions_count", table_status.processed_partitions_count);
result_json.set(table_name, status_json);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(result_json, oss);
auto result = oss.str();
return result;
}
static MapPtr fromJSON(String state_json);
static String serializeToJSON(MapPtr statuses);
};
}

View File

@ -0,0 +1,74 @@
#include "TaskCluster.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
TaskCluster::TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_)
, default_local_database(default_local_database_)
{}
void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
clusters_prefix = prefix + "remote_servers";
if (!config.has(clusters_prefix))
throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
Poco::Util::AbstractConfiguration::Keys tables_keys;
config.keys(prefix + "tables", tables_keys);
for (const auto & table_key : tables_keys)
{
table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
}
}
void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
max_workers = config.getUInt64(prefix + "max_workers");
settings_common = Settings();
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = false;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
settings_push = settings_common;
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.value : default_value;
};
/// Override important settings
settings_pull.readonly = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}
}

View File

@ -1,21 +1,20 @@
#pragma once
#include "Aliases.h"
#include "TaskTable.h"
#include <Core/Settings.h>
#include <base/types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <random>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
struct TaskCluster
{
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_)
: task_zookeeper_path(task_zookeeper_path_)
, default_local_database(default_local_database_)
{}
TaskCluster(const String & task_zookeeper_path_, const String & default_local_database_);
void loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key = "");
@ -50,61 +49,4 @@ struct TaskCluster
pcg64 random_engine;
};
inline void DB::TaskCluster::loadTasks(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
clusters_prefix = prefix + "remote_servers";
if (!config.has(clusters_prefix))
throw Exception("You should specify list of clusters in " + clusters_prefix, ErrorCodes::BAD_ARGUMENTS);
Poco::Util::AbstractConfiguration::Keys tables_keys;
config.keys(prefix + "tables", tables_keys);
for (const auto & table_key : tables_keys)
{
table_tasks.emplace_back(*this, config, prefix + "tables", table_key);
}
}
inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & config, const String & base_key)
{
String prefix = base_key.empty() ? "" : base_key + ".";
max_workers = config.getUInt64(prefix + "max_workers");
settings_common = Settings();
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = 0;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
settings_push = settings_common;
if (config.has(prefix + "settings_push"))
settings_push.loadSettingsFromConfig(prefix + "settings_push", config);
auto set_default_value = [] (auto && setting, auto && default_value)
{
setting = setting.changed ? setting.value : default_value;
};
/// Override important settings
settings_pull.readonly = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}
}

View File

@ -0,0 +1,37 @@
#include "TaskShard.h"
#include "TaskTable.h"
namespace DB
{
TaskShard::TaskShard(TaskTable & parent, const Cluster::ShardInfo & info_)
: task_table(parent)
, info(info_)
{
list_of_split_tables_on_shard.assign(task_table.number_of_splits, DatabaseAndTableName());
}
UInt32 TaskShard::numberInCluster() const
{
return info.shard_num;
}
UInt32 TaskShard::indexInCluster() const
{
return info.shard_num - 1;
}
String DB::TaskShard::getDescription() const
{
return fmt::format("N{} (having a replica {}, pull table {} of cluster {}",
numberInCluster(), getHostNameExample(), getQuotedTable(task_table.table_pull), task_table.cluster_pull_name);
}
String DB::TaskShard::getHostNameExample() const
{
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}
}

View File

@ -0,0 +1,56 @@
#pragma once
#include "Aliases.h"
#include "Internals.h"
#include "ClusterPartition.h"
#include "ShardPartition.h"
namespace DB
{
struct TaskTable;
struct TaskShard
{
TaskShard(TaskTable & parent, const Cluster::ShardInfo & info_);
TaskTable & task_table;
Cluster::ShardInfo info;
UInt32 numberInCluster() const;
UInt32 indexInCluster() const;
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by their proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName main_table_split_shard;
ListOfDatabasesAndTableNames list_of_split_tables_on_shard;
};
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
}

View File

@ -0,0 +1,221 @@
#include "TaskTable.h"
#include "ClusterPartition.h"
#include "TaskCluster.h"
#include <Parsers/ASTFunction.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent)
{
String table_prefix = prefix_ + "." + table_key + ".";
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
allow_to_drop_target_partitions = config.getBool(table_prefix + "allow_to_drop_target_partitions", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine", "rand()");
{
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = boost::algorithm::join(extractPrimaryKeyColumnNames(engine_push_ast), ", ");
is_replicated_table = isReplicatedTableEngine(engine_push_ast);
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
auxiliary_engine_split_asts.reserve(number_of_splits);
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
main_engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second,
sharding_key_ast);
for (const auto piece_number : collections::range(0, number_of_splits))
{
auxiliary_engine_split_asts.emplace_back
(
createASTStorageDistributed(cluster_push_name, table_push.first,
table_push.second + "_piece_" + toString(piece_number), sharding_key_ast)
);
}
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
if (!where_condition_str.empty())
{
ParserExpressionWithOptionalAlias parser_expression(false);
where_condition_ast = parseQuery(parser_expression, where_condition_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
// Will use canonical expression form
where_condition_str = queryToString(where_condition_ast);
}
String enabled_partitions_prefix = table_prefix + "enabled_partitions";
has_enabled_partitions = config.has(enabled_partitions_prefix);
if (has_enabled_partitions)
{
Strings keys;
config.keys(enabled_partitions_prefix, keys);
if (keys.empty())
{
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
}
else
{
/// Parse sequence of <partition>...</partition>
for (const String &key : keys)
{
if (!startsWith(key, "partition"))
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
}
}
std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
}
}
String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
String TaskTable::getPartitionAttachIsActivePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_active";
}
String TaskTable::getPartitionAttachIsDonePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_is_done";
}
String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const
{
assert(piece_number < number_of_splits);
return getPartitionPath(partition_name) + "/piece_" + toString(piece_number); // 1...number_of_splits
}
String TaskTable::getCertainPartitionIsDirtyPath(const String &partition_name) const
{
return getPartitionPath(partition_name) + "/is_dirty";
}
String TaskTable::getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/is_dirty";
}
String TaskTable::getCertainPartitionIsCleanedPath(const String & partition_name) const
{
return getCertainPartitionIsDirtyPath(partition_name) + "/cleaned";
}
String TaskTable::getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
{
return getCertainPartitionPieceIsDirtyPath(partition_name, piece_number) + "/cleaned";
}
String TaskTable::getCertainPartitionTaskStatusPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/shards";
}
String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
bool TaskTable::isReplicatedTable() const
{
return is_replicated_table;
}
String TaskTable::getStatusAllPartitionCount() const
{
return task_cluster.task_zookeeper_path + "/status/all_partitions_count";
}
String TaskTable::getStatusProcessedPartitionsCount() const
{
return task_cluster.task_zookeeper_path + "/status/processed_partitions_count";
}
ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
auto & new_storage_ast = prev_engine_push_ast->as<ASTStorage &>();
auto & new_engine_ast = new_storage_ast.engine->as<ASTFunction &>();
/// Remove "Replicated" from name
new_engine_ast.name = new_engine_ast.name.substr(10);
if (new_engine_ast.arguments)
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
}
ClusterPartition & TaskTable::getClusterPartition(const String & partition_name)
{
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id,
ErrorCodes::LOGICAL_ERROR);
return it->second;
}
}

173
programs/copier/TaskTable.h Normal file
View File

@ -0,0 +1,173 @@
#pragma once
#include "Aliases.h"
#include "TaskShard.h"
namespace DB
{
struct ClusterPartition;
struct TaskCluster;
struct TaskTable
{
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix, const String & table_key);
TaskCluster & task_cluster;
/// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone()
/// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc.
String getPartitionPath(const String & partition_name) const;
String getPartitionAttachIsActivePath(const String & partition_name) const;
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
bool isReplicatedTable() const;
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
/// Partitions will be split into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
String name_in_config;
/// Used as task ID
String table_id;
/// Column names in primary key
String primary_key_comma_separated;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
/// (tables that are stored on each shard of target cluster)
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// First argument of Replicated...MergeTree()
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
* Distributed table will be created on each shard of default
* cluster to perform data copying and resharding
* */
String sharding_key_str;
ASTPtr sharding_key_ast;
ASTPtr main_engine_split_ast;
/*
* To copy partition piece form one cluster to another we have to use Distributed table.
* In case of usage separate table (engine_push) for each partition piece,
* we have to use many Distributed tables.
* */
ASTs auxiliary_engine_split_asts;
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
/**
* Prioritized list of shards
* all_shards contains information about all shards in the table.
* So we have to check whether particular shard have current partition or not while processing.
*/
TasksShard all_shards;
TasksShard local_shards;
/// All partitions of the current table.
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
/// Partition names to process in user-specified order
Strings ordered_partition_names;
ClusterPartition & getClusterPartition(const String & partition_name);
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
template <typename RandomEngine>
void initShards(RandomEngine &&random_engine);
};
using TasksTable = std::list<TaskTable>;
template<typename RandomEngine>
inline void TaskTable::initShards(RandomEngine && random_engine)
{
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<uint8_t> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs)
{
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
[](const TaskShardPtr & lhs, UInt8 is_remote)
{
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
}

View File

@ -1,434 +0,0 @@
#pragma once
#include "Aliases.h"
#include "Internals.h"
#include "ClusterPartition.h"
#include <Core/Defines.h>
#include <Parsers/ASTFunction.h>
#include <base/map.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int LOGICAL_ERROR;
}
struct TaskShard;
struct TaskTable
{
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
const String & table_key);
TaskCluster & task_cluster;
/// These functions used in checkPartitionIsDone() or checkPartitionPieceIsDone()
/// They are implemented here not to call task_table.tasks_shard[partition_name].second.pieces[current_piece_number] etc.
String getPartitionPath(const String & partition_name) const;
String getPartitionAttachIsActivePath(const String & partition_name) const;
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
bool isReplicatedTable() const { return is_replicated_table; }
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
/// Partitions will be split into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
String name_in_config;
/// Used as task ID
String table_id;
/// Column names in primary key
String primary_key_comma_separated;
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
/// Storage of destination table
/// (tables that are stored on each shard of target cluster)
String engine_push_str;
ASTPtr engine_push_ast;
ASTPtr engine_push_partition_key_ast;
/// First argument of Replicated...MergeTree()
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
* Distributed table will be created on each shard of default
* cluster to perform data copying and resharding
* */
String sharding_key_str;
ASTPtr sharding_key_ast;
ASTPtr main_engine_split_ast;
/*
* To copy partition piece form one cluster to another we have to use Distributed table.
* In case of usage separate table (engine_push) for each partition piece,
* we have to use many Distributed tables.
* */
ASTs auxiliary_engine_split_asts;
/// Additional WHERE expression to filter input data
String where_condition_str;
ASTPtr where_condition_ast;
/// Resolved clusters
ClusterPtr cluster_pull;
ClusterPtr cluster_push;
/// Filter partitions that should be copied
bool has_enabled_partitions = false;
Strings enabled_partitions;
NameSet enabled_partitions_set;
/**
* Prioritized list of shards
* all_shards contains information about all shards in the table.
* So we have to check whether particular shard have current partition or not while processing.
*/
TasksShard all_shards;
TasksShard local_shards;
/// All partitions of the current table.
ClusterPartitions cluster_partitions;
NameSet finished_cluster_partitions;
/// Partition names to process in user-specified order
Strings ordered_partition_names;
ClusterPartition & getClusterPartition(const String & partition_name)
{
auto it = cluster_partitions.find(partition_name);
if (it == cluster_partitions.end())
throw Exception("There are no cluster partition " + partition_name + " in " + table_id,
ErrorCodes::LOGICAL_ERROR);
return it->second;
}
Stopwatch watch;
UInt64 bytes_copied = 0;
UInt64 rows_copied = 0;
template <typename RandomEngine>
void initShards(RandomEngine &&random_engine);
};
struct TaskShard
{
TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_)
{
list_of_split_tables_on_shard.assign(task_table.number_of_splits, DatabaseAndTableName());
}
TaskTable & task_table;
ShardInfo info;
UInt32 numberInCluster() const { return info.shard_num; }
UInt32 indexInCluster() const { return info.shard_num - 1; }
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by their proximity
ShardPriority priority;
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column;
/// There is a task for each destination partition
TasksPartition partition_tasks;
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std::set<String> checked_partitions;
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
DatabaseAndTableName main_table_split_shard;
ListOfDatabasesAndTableNames list_of_split_tables_on_shard;
};
inline String TaskTable::getPartitionPath(const String & partition_name) const
{
return task_cluster.task_zookeeper_path // root
+ "/tables/" + table_id // tables/dst_cluster.merge.hits
+ "/" + escapeForFileName(partition_name); // 201701
}
inline String TaskTable::getPartitionAttachIsActivePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_active";
}
inline String TaskTable::getPartitionAttachIsDonePath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/attach_is_done";
}
inline String TaskTable::getPartitionPiecePath(const String & partition_name, size_t piece_number) const
{
assert(piece_number < number_of_splits);
return getPartitionPath(partition_name) + "/piece_" + toString(piece_number); // 1...number_of_splits
}
inline String TaskTable::getCertainPartitionIsDirtyPath(const String &partition_name) const
{
return getPartitionPath(partition_name) + "/is_dirty";
}
inline String TaskTable::getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/is_dirty";
}
inline String TaskTable::getCertainPartitionIsCleanedPath(const String & partition_name) const
{
return getCertainPartitionIsDirtyPath(partition_name) + "/cleaned";
}
inline String TaskTable::getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const
{
return getCertainPartitionPieceIsDirtyPath(partition_name, piece_number) + "/cleaned";
}
inline String TaskTable::getCertainPartitionTaskStatusPath(const String & partition_name) const
{
return getPartitionPath(partition_name) + "/shards";
}
inline String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const
{
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
inline String TaskTable::getStatusAllPartitionCount() const
{
return task_cluster.task_zookeeper_path + "/status/all_partitions_count";
}
inline String TaskTable::getStatusProcessedPartitionsCount() const
{
return task_cluster.task_zookeeper_path + "/status/processed_partitions_count";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent)
{
String table_prefix = prefix_ + "." + table_key + ".";
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
allow_to_drop_target_partitions = config.getBool(table_prefix + "allow_to_drop_target_partitions", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
/// Used as node name in ZooKeeper
table_id = escapeForFileName(cluster_push_name)
+ "." + escapeForFileName(table_push.first)
+ "." + escapeForFileName(table_push.second);
engine_push_str = config.getString(table_prefix + "engine", "rand()");
{
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = boost::algorithm::join(extractPrimaryKeyColumnNames(engine_push_ast), ", ");
is_replicated_table = isReplicatedTableEngine(engine_push_ast);
}
sharding_key_str = config.getString(table_prefix + "sharding_key");
auxiliary_engine_split_asts.reserve(number_of_splits);
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
main_engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second,
sharding_key_ast);
for (const auto piece_number : collections::range(0, number_of_splits))
{
auxiliary_engine_split_asts.emplace_back
(
createASTStorageDistributed(cluster_push_name, table_push.first,
table_push.second + "_piece_" + toString(piece_number), sharding_key_ast)
);
}
}
where_condition_str = config.getString(table_prefix + "where_condition", "");
if (!where_condition_str.empty())
{
ParserExpressionWithOptionalAlias parser_expression(false);
where_condition_ast = parseQuery(parser_expression, where_condition_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
// Will use canonical expression form
where_condition_str = queryToString(where_condition_ast);
}
String enabled_partitions_prefix = table_prefix + "enabled_partitions";
has_enabled_partitions = config.has(enabled_partitions_prefix);
if (has_enabled_partitions)
{
Strings keys;
config.keys(enabled_partitions_prefix, keys);
if (keys.empty())
{
/// Parse list of partition from space-separated string
String partitions_str = config.getString(table_prefix + "enabled_partitions");
boost::trim_if(partitions_str, isWhitespaceASCII);
boost::split(enabled_partitions, partitions_str, isWhitespaceASCII, boost::token_compress_on);
}
else
{
/// Parse sequence of <partition>...</partition>
for (const String &key : keys)
{
if (!startsWith(key, "partition"))
throw Exception("Unknown key " + key + " in " + enabled_partitions_prefix, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
enabled_partitions.emplace_back(config.getString(enabled_partitions_prefix + "." + key));
}
}
std::copy(enabled_partitions.begin(), enabled_partitions.end(), std::inserter(enabled_partitions_set, enabled_partitions_set.begin()));
}
}
template<typename RandomEngine>
inline void TaskTable::initShards(RandomEngine && random_engine)
{
const String & fqdn_name = getFQDNOrHostName();
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
task_shard->priority = getReplicasPriority(replicas, fqdn_name, get_urand(random_engine));
all_shards.emplace_back(task_shard);
}
// Sort by priority
std::sort(all_shards.begin(), all_shards.end(),
[](const TaskShardPtr & lhs, const TaskShardPtr & rhs)
{
return ShardPriority::greaterPriority(lhs->priority, rhs->priority);
});
// Cut local shards
auto it_first_remote = std::lower_bound(all_shards.begin(), all_shards.end(), 1,
[](const TaskShardPtr & lhs, UInt8 is_remote)
{
return lhs->priority.is_remote < is_remote;
});
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
auto & new_storage_ast = prev_engine_push_ast->as<ASTStorage &>();
auto & new_engine_ast = new_storage_ast.engine->as<ASTFunction &>();
/// Remove "Replicated" from name
new_engine_ast.name = new_engine_ast.name.substr(10);
if (new_engine_ast.arguments)
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
}
inline String DB::TaskShard::getDescription() const
{
return fmt::format("N{} (having a replica {}, pull table {} of cluster {}",
numberInCluster(), getHostNameExample(), getQuotedTable(task_table.table_pull), task_table.cluster_pull_name);
}
inline String DB::TaskShard::getHostNameExample() const
{
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}
}

View File

@ -2,6 +2,7 @@
#include <arpa/inet.h>
#include <sys/select.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include "ares.h"
#include "netdb.h"
@ -40,6 +41,8 @@ namespace DB
}
}
std::mutex CaresPTRResolver::mutex;
CaresPTRResolver::CaresPTRResolver(CaresPTRResolver::provider_token) : channel(nullptr)
{
/*
@ -73,6 +76,8 @@ namespace DB
std::unordered_set<std::string> CaresPTRResolver::resolve(const std::string & ip)
{
std::lock_guard guard(mutex);
std::unordered_set<std::string> ptr_records;
resolve(ip, ptr_records);
@ -83,6 +88,8 @@ namespace DB
std::unordered_set<std::string> CaresPTRResolver::resolve_v6(const std::string & ip)
{
std::lock_guard guard(mutex);
std::unordered_set<std::string> ptr_records;
resolve_v6(ip, ptr_records);
@ -110,23 +117,83 @@ namespace DB
void CaresPTRResolver::wait()
{
timeval * tvp, tv;
fd_set read_fds;
fd_set write_fds;
int nfds;
int sockets[ARES_GETSOCK_MAXNUM];
pollfd pollfd[ARES_GETSOCK_MAXNUM];
for (;;)
while (true)
{
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
nfds = ares_fds(channel, &read_fds,&write_fds);
if (nfds == 0)
auto readable_sockets = get_readable_sockets(sockets, pollfd);
auto timeout = calculate_timeout();
int number_of_fds_ready = 0;
if (!readable_sockets.empty())
{
number_of_fds_ready = poll(readable_sockets.data(), readable_sockets.size(), timeout);
}
if (number_of_fds_ready > 0)
{
process_readable_sockets(readable_sockets);
}
else
{
process_possible_timeout();
break;
}
}
}
std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd)
{
int sockets_bitmask = ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);
int number_of_sockets_to_poll = 0;
for (int i = 0; i < ARES_GETSOCK_MAXNUM; i++, number_of_sockets_to_poll++)
{
pollfd[i].events = 0;
pollfd[i].revents = 0;
if (ARES_GETSOCK_READABLE(sockets_bitmask, i))
{
pollfd[i].fd = sockets[i];
pollfd[i].events = POLLIN;
}
else
{
break;
}
tvp = ares_timeout(channel, nullptr, &tv);
select(nfds, &read_fds, &write_fds, nullptr, tvp);
ares_process(channel, &read_fds, &write_fds);
}
return std::span<struct pollfd>(pollfd, number_of_sockets_to_poll);
}
int64_t CaresPTRResolver::calculate_timeout()
{
timeval tv;
if (auto * tvp = ares_timeout(channel, nullptr, &tv))
{
auto timeout = tvp->tv_sec * 1000 + tvp->tv_usec / 1000;
return timeout;
}
return 0;
}
void CaresPTRResolver::process_possible_timeout()
{
/* Call ares_process() unconditonally here, even if we simply timed out
above, as otherwise the ares name resolve won't timeout! */
ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
}
void CaresPTRResolver::process_readable_sockets(std::span<pollfd> readable_sockets)
{
for (auto readable_socket : readable_sockets)
{
auto fd = readable_socket.revents & POLLIN ? readable_socket.fd : ARES_SOCKET_BAD;
ares_process_fd(channel, fd, ARES_SOCKET_BAD);
}
}
}

View File

@ -1,5 +1,8 @@
#pragma once
#include <span>
#include <poll.h>
#include <mutex>
#include "DNSPTRResolver.h"
using ares_channel = struct ares_channeldata *;
@ -20,7 +23,6 @@ namespace DB
* Allow only DNSPTRProvider to instantiate this class
* */
struct provider_token {};
public:
explicit CaresPTRResolver(provider_token);
~CaresPTRResolver() override;
@ -36,7 +38,17 @@ namespace DB
void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response);
std::span<pollfd> get_readable_sockets(int * sockets, pollfd * pollfd);
int64_t calculate_timeout();
void process_possible_timeout();
void process_readable_sockets(std::span<pollfd> readable_sockets);
ares_channel channel;
static std::mutex mutex;
};
}

View File

@ -5,8 +5,10 @@ namespace DB
{
std::shared_ptr<DNSPTRResolver> DNSPTRResolverProvider::get()
{
return std::make_shared<CaresPTRResolver>(
static auto resolver = std::make_shared<CaresPTRResolver>(
CaresPTRResolver::provider_token {}
);
return resolver;
}
}

View File

@ -1049,7 +1049,7 @@ INSTANTIATE_TEST_SUITE_P(RandomInt,
::testing::Combine(
DefaultCodecsToTest,
::testing::Values(
generateSeq<UInt8 >(G(RandomGenerator<UInt8>(0))),
generateSeq<UInt8 >(G(RandomGenerator<uint8_t>(0))),
generateSeq<UInt16>(G(RandomGenerator<UInt16>(0))),
generateSeq<UInt32>(G(RandomGenerator<UInt32>(0, 0, 1000'000'000))),
generateSeq<UInt64>(G(RandomGenerator<UInt64>(0, 0, 1000'000'000)))

View File

@ -69,7 +69,7 @@ static std::ostream & operator<<(std::ostream & ostr, const JSONPathAndValue & p
bool first = true;
for (const auto & part : path_and_value.path.getParts())
{
ostr << (first ? "{" : ", {") << part.key << ", " << part.is_nested << ", " << part.anonymous_array_level << "}";
ostr << (first ? "{" : ", {") << part.key << ", " << part.is_nested << ", " << static_cast<uint8_t>(part.anonymous_array_level) << "}";
first = false;
}

View File

@ -43,10 +43,34 @@ using FunctionCutToFirstSignificantSubdomainWithWWWRFC = FunctionStringToString<
REGISTER_FUNCTION(CutToFirstSignificantSubdomain)
{
factory.registerFunction<FunctionCutToFirstSignificantSubdomain>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainWithWWW>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainRFC>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainWithWWWRFC>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomain>(
{
R"(Returns the part of the domain that includes top-level subdomains up to the "first significant subdomain" (see documentation of the `firstSignificantSubdomain`).)",
Documentation::Examples{
{"cutToFirstSignificantSubdomain1", "SELECT cutToFirstSignificantSubdomain('https://news.clickhouse.com.tr/')"},
{"cutToFirstSignificantSubdomain2", "SELECT cutToFirstSignificantSubdomain('www.tr')"},
{"cutToFirstSignificantSubdomain3", "SELECT cutToFirstSignificantSubdomain('tr')"},
},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainWithWWW>(
{
R"(Returns the part of the domain that includes top-level subdomains up to the "first significant subdomain", without stripping "www".)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainRFC>(
{
R"(Similar to `cutToFirstSignificantSubdomain` but follows stricter rules to be compatible with RFC 3986 and less performant.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainWithWWWRFC>(
{
R"(Similar to `cutToFirstSignificantSubdomainWithWWW` but follows stricter rules to be compatible with RFC 3986 and less performant.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -42,10 +42,41 @@ using FunctionCutToFirstSignificantSubdomainCustomWithWWWRFC = FunctionCutToFirs
REGISTER_FUNCTION(CutToFirstSignificantSubdomainCustom)
{
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustom>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomWithWWW>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomRFC>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomWithWWWRFC>();
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustom>(
{
R"(
Returns the part of the domain that includes top-level subdomains up to the first significant subdomain. Accepts custom TLD list name.
Can be useful if you need fresh TLD list or you have custom.
)",
Documentation::Examples{
{"cutToFirstSignificantSubdomainCustom", "SELECT cutToFirstSignificantSubdomainCustom('bar.foo.there-is-no-such-domain', 'public_suffix_list');"},
},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomWithWWW>(
{
R"(
Returns the part of the domain that includes top-level subdomains up to the first significant subdomain without stripping `www`.
Accepts custom TLD list name from config.
Can be useful if you need fresh TLD list or you have custom.
)",
Documentation::Examples{{"cutToFirstSignificantSubdomainCustomWithWWW", "SELECT cutToFirstSignificantSubdomainCustomWithWWW('www.foo', 'public_suffix_list')"}},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomRFC>(
{
R"(Similar to `cutToFirstSignificantSubdomainCustom` but follows stricter rules according to RFC 3986.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionCutToFirstSignificantSubdomainCustomWithWWWRFC>(
{
R"(Similar to `cutToFirstSignificantSubdomainCustomWithWWW` but follows stricter rules according to RFC 3986.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -14,8 +14,24 @@ using FunctionDomainRFC = FunctionStringToString<ExtractSubstringImpl<ExtractDom
REGISTER_FUNCTION(Domain)
{
factory.registerFunction<FunctionDomain>();
factory.registerFunction<FunctionDomainRFC>();
factory.registerFunction<FunctionDomain>(
{
R"(
Extracts the hostname from a URL.
The URL can be specified with or without a scheme.
If the argument can't be parsed as URL, the function returns an empty string.
)",
Documentation::Examples{{"domain", "SELECT domain('svn+ssh://some.svn-hosting.com:80/repo/trunk')"}},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionDomainRFC>(
{
R"(Similar to `domain` but follows stricter rules to be compatible with RFC 3986 and less performant.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -14,8 +14,23 @@ using FunctionDomainWithoutWWWRFC = FunctionStringToString<ExtractSubstringImpl<
REGISTER_FUNCTION(DomainWithoutWWW)
{
factory.registerFunction<FunctionDomainWithoutWWW>();
factory.registerFunction<FunctionDomainWithoutWWWRFC>();
factory.registerFunction<FunctionDomainWithoutWWW>(
{
R"(
Extracts the hostname from a URL, removing the leading "www." if present.
The URL can be specified with or without a scheme.
If the argument can't be parsed as URL, the function returns an empty string.
)",
Documentation::Examples{{"domainWithoutWWW", "SELECT domainWithoutWWW('https://www.clickhouse.com')"}},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionDomainWithoutWWWRFC>(
{
R"(Similar to `domainWithoutWWW` but follows stricter rules to be compatible with RFC 3986 and less performant.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -14,8 +14,28 @@ using FunctionFirstSignificantSubdomainRFC = FunctionStringToString<ExtractSubst
REGISTER_FUNCTION(FirstSignificantSubdomain)
{
factory.registerFunction<FunctionFirstSignificantSubdomain>();
factory.registerFunction<FunctionFirstSignificantSubdomainRFC>();
factory.registerFunction<FunctionFirstSignificantSubdomain>(
{
R"(
Returns the "first significant subdomain".
The first significant subdomain is a second-level domain if it is 'com', 'net', 'org', or 'co'.
Otherwise, it is a third-level domain.
For example, firstSignificantSubdomain('https://news.clickhouse.com/') = 'clickhouse', firstSignificantSubdomain ('https://news.clickhouse.com.tr/') = 'clickhouse'.
The list of "insignificant" second-level domains and other implementation details may change in the future.
)",
Documentation::Examples{{"firstSignificantSubdomain", "SELECT firstSignificantSubdomain('https://news.clickhouse.com/')"}},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionFirstSignificantSubdomainRFC>(
{
R"(Returns the "first significant subdomain" according to RFC 1034.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -139,8 +139,18 @@ struct FunctionPortRFC : public FunctionPortImpl<true>
REGISTER_FUNCTION(Port)
{
factory.registerFunction<FunctionPort>();
factory.registerFunction<FunctionPortRFC>();
factory.registerFunction<FunctionPort>(
{
R"(Returns the port or `default_port` if there is no port in the URL (or in case of validation error).)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionPortRFC>(
{
R"(Similar to `port`, but conforms to RFC 3986.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -53,8 +53,23 @@ using FunctionTopLevelDomainRFC = FunctionStringToString<ExtractSubstringImpl<Ex
REGISTER_FUNCTION(TopLevelDomain)
{
factory.registerFunction<FunctionTopLevelDomain>();
factory.registerFunction<FunctionTopLevelDomainRFC>();
factory.registerFunction<FunctionTopLevelDomain>(
{
R"(
Extracts the the top-level domain from a URL.
Returns an empty string if the argument cannot be parsed as a URL or does not contain a top-level domain.
)",
Documentation::Examples{{"topLevelDomain", "SELECT topLevelDomain('svn+ssh://www.some.svn-hosting.com:80/repo/trunk')"}},
Documentation::Categories{"URL"}
});
factory.registerFunction<FunctionTopLevelDomainRFC>(
{
R"(Similar to topLevelDomain, but conforms to RFC 3986.)",
Documentation::Examples{},
Documentation::Categories{"URL"}
});
}
}

View File

@ -1,7 +1,5 @@
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/AlignedBuffer.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -18,70 +16,6 @@ AggregatingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition() = default;
AggregatingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition(ColumnsDefinition &&) noexcept = default;
AggregatingSortedAlgorithm::ColumnsDefinition::~ColumnsDefinition() = default;
/// Stores information for aggregation of AggregateFunction columns
struct AggregatingSortedAlgorithm::AggregateDescription
{
ColumnAggregateFunction * column = nullptr;
const size_t column_number = 0; /// Position in header.
AggregateDescription() = default;
explicit AggregateDescription(size_t col_number) : column_number(col_number) {}
};
/// Stores information for aggregation of SimpleAggregateFunction columns
struct AggregatingSortedAlgorithm::SimpleAggregateDescription
{
/// An aggregate function 'anyLast', 'sum'...
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
size_t column_number = 0;
IColumn * column = nullptr;
/// For LowCardinality, convert is converted to nested type. nested_type is nullptr if no conversion needed.
const DataTypePtr nested_type; /// Nested type for LowCardinality, if it is.
const DataTypePtr real_type; /// Type in header.
AlignedBuffer state;
bool created = false;
SimpleAggregateDescription(
AggregateFunctionPtr function_, const size_t column_number_,
DataTypePtr nested_type_, DataTypePtr real_type_)
: function(std::move(function_)), column_number(column_number_)
, nested_type(std::move(nested_type_)), real_type(std::move(real_type_))
{
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
}
void createState()
{
if (created)
return;
function->create(state.data());
created = true;
}
void destroyState()
{
if (!created)
return;
function->destroy(state.data());
created = false;
}
/// Explicitly destroy aggregation state if the stream is terminated
~SimpleAggregateDescription()
{
destroyState();
}
SimpleAggregateDescription() = default;
SimpleAggregateDescription(SimpleAggregateDescription &&) = default;
SimpleAggregateDescription(const SimpleAggregateDescription &) = delete;
};
static AggregatingSortedAlgorithm::ColumnsDefinition defineColumns(
const Block & header, const SortDescription & description)
{
@ -191,6 +125,39 @@ static void postprocessChunk(Chunk & chunk, const AggregatingSortedAlgorithm::Co
}
AggregatingSortedAlgorithm::SimpleAggregateDescription::SimpleAggregateDescription(
AggregateFunctionPtr function_, const size_t column_number_,
DataTypePtr nested_type_, DataTypePtr real_type_)
: function(std::move(function_)), column_number(column_number_)
, nested_type(std::move(nested_type_)), real_type(std::move(real_type_))
{
add_function = function->getAddressOfAddFunction();
state.reset(function->sizeOfData(), function->alignOfData());
}
void AggregatingSortedAlgorithm::SimpleAggregateDescription::createState()
{
if (created)
return;
function->create(state.data());
created = true;
}
void AggregatingSortedAlgorithm::SimpleAggregateDescription::destroyState()
{
if (!created)
return;
function->destroy(state.data());
created = false;
}
/// Explicitly destroy aggregation state if the stream is terminated
AggregatingSortedAlgorithm::SimpleAggregateDescription::~SimpleAggregateDescription()
{
destroyState();
}
AggregatingSortedAlgorithm::AggregatingMergedData::AggregatingMergedData(
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_), def(def_)

View File

@ -1,5 +1,7 @@
#pragma once
#include <Columns/ColumnAggregateFunction.h>
#include <Common/AlignedBuffer.h>
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergedData.h>
@ -23,8 +25,48 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
struct SimpleAggregateDescription;
struct AggregateDescription;
/// Stores information for aggregation of SimpleAggregateFunction columns
struct SimpleAggregateDescription
{
/// An aggregate function 'anyLast', 'sum'...
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
size_t column_number = 0;
IColumn * column = nullptr;
/// For LowCardinality, convert is converted to nested type. nested_type is nullptr if no conversion needed.
const DataTypePtr nested_type; /// Nested type for LowCardinality, if it is.
const DataTypePtr real_type; /// Type in header.
AlignedBuffer state;
bool created = false;
SimpleAggregateDescription(
AggregateFunctionPtr function_, const size_t column_number_,
DataTypePtr nested_type_, DataTypePtr real_type_);
void createState();
void destroyState();
/// Explicitly destroy aggregation state if the stream is terminated
~SimpleAggregateDescription();
SimpleAggregateDescription() = default;
SimpleAggregateDescription(SimpleAggregateDescription &&) = default;
SimpleAggregateDescription(const SimpleAggregateDescription &) = delete;
};
/// Stores information for aggregation of AggregateFunction columns
struct AggregateDescription
{
ColumnAggregateFunction * column = nullptr;
const size_t column_number = 0; /// Position in header.
AggregateDescription() = default;
explicit AggregateDescription(size_t col_number) : column_number(col_number) {}
};
/// This structure define columns into one of three types:
/// * columns which are not aggregate functions and not needed to be aggregated

View File

@ -23,10 +23,6 @@ namespace ErrorCodes
extern const int CORRUPTED_DATA;
}
SummingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition() = default;
SummingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition(ColumnsDefinition &&) noexcept = default;
SummingSortedAlgorithm::ColumnsDefinition::~ColumnsDefinition() = default;
/// Stores numbers of key-columns and value-columns.
struct SummingSortedAlgorithm::MapDescription
{
@ -777,4 +773,8 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
return Status(merged_data.pull(), true);
}
SummingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition() = default;
SummingSortedAlgorithm::ColumnsDefinition::ColumnsDefinition(ColumnsDefinition &&) noexcept = default;
SummingSortedAlgorithm::ColumnsDefinition::~ColumnsDefinition() = default;
}

View File

@ -607,7 +607,7 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
if (!row_level_column.type->canBeUsedInBooleanContext())
{
throw Exception("Invalid type for filter in PREWHERE: " + row_level_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
}
block.erase(prewhere_info->row_level_column_name);
@ -620,7 +620,7 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
if (!prewhere_column.type->canBeUsedInBooleanContext())
{
throw Exception("Invalid type for filter in PREWHERE: " + prewhere_column.type->getName(),
ErrorCodes::LOGICAL_ERROR);
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
}
if (prewhere_info->remove_prewhere_column)
@ -628,13 +628,13 @@ Block MergeTreeBaseSelectProcessor::transformHeader(
else
{
WhichDataType which(removeNullable(recursiveRemoveLowCardinality(prewhere_column.type)));
if (which.isInt() || which.isUInt())
if (which.isNativeInt() || which.isNativeUInt())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1u)->convertToFullColumnIfConst();
else if (which.isFloat())
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), 1.0f)->convertToFullColumnIfConst();
else
throw Exception("Illegal type " + prewhere_column.type->getName() + " of column for filter.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER, "Illegal type {} of column for filter", prewhere_column.type->getName());
}
}

View File

@ -0,0 +1,4 @@
<clickhouse>
<disable_internal_dns_cache>1</disable_internal_dns_cache>
<max_concurrent_queries>250</max_concurrent_queries>
</clickhouse>

View File

@ -0,0 +1,11 @@
<yandex>
<users>
<test_dns>
<password/>
<networks>
<host_regexp>test1\.example\.com$</host_regexp>
</networks>
<profile>default</profile>
</test_dns>
</users>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<listen_host>::</listen_host>
<listen_host>0.0.0.0</listen_host>
<listen_try>1</listen_try>
</yandex>

View File

@ -0,0 +1,8 @@
. {
hosts /example.com {
reload "200ms"
fallthrough
}
forward . 127.0.0.11
log
}

View File

@ -0,0 +1 @@
filled in runtime, but needs to exist in order to be volume mapped in docker

View File

@ -0,0 +1,63 @@
import pycurl
import threading
from io import BytesIO
import sys
client_ip = sys.argv[1]
server_ip = sys.argv[2]
mutex = threading.Lock()
success_counter = 0
number_of_threads = 100
number_of_iterations = 100
def perform_request():
buffer = BytesIO()
crl = pycurl.Curl()
crl.setopt(pycurl.INTERFACE, client_ip)
crl.setopt(crl.WRITEDATA, buffer)
crl.setopt(crl.URL, f"http://{server_ip}:8123/?query=select+1&user=test_dns")
crl.perform()
# End curl session
crl.close()
str_response = buffer.getvalue().decode("iso-8859-1")
expected_response = "1\n"
mutex.acquire()
global success_counter
if str_response == expected_response:
success_counter += 1
mutex.release()
def perform_multiple_requests(n):
for request_number in range(n):
perform_request()
threads = []
for i in range(number_of_threads):
thread = threading.Thread(
target=perform_multiple_requests, args=(number_of_iterations,)
)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if success_counter == number_of_threads * number_of_iterations:
exit(0)
exit(1)

View File

@ -0,0 +1,71 @@
import pytest
from helpers.cluster import ClickHouseCluster, get_docker_compose_path, run_and_check
from time import sleep
import os
DOCKER_COMPOSE_PATH = get_docker_compose_path()
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
ch_server = cluster.add_instance(
"clickhouse-server",
with_coredns=True,
main_configs=["configs/config.xml", "configs/listen_host.xml"],
user_configs=["configs/host_regexp.xml"],
)
client = cluster.add_instance(
"clickhouse-client",
)
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def setup_dns_server(ip):
domains_string = "test3.example.com test2.example.com test1.example.com"
example_file_path = f'{ch_server.env_variables["COREDNS_CONFIG_DIR"]}/example.com'
run_and_check(f"echo '{ip} {domains_string}' > {example_file_path}", shell=True)
def setup_ch_server(dns_server_ip):
ch_server.exec_in_container(
(["bash", "-c", f"echo 'nameserver {dns_server_ip}' > /etc/resolv.conf"])
)
ch_server.exec_in_container(
(["bash", "-c", "echo 'options ndots:0' >> /etc/resolv.conf"])
)
ch_server.query("SYSTEM DROP DNS CACHE")
def build_endpoint_v4(ip):
return f"'http://{ip}:8123/?query=SELECT+1&user=test_dns'"
def build_endpoint_v6(ip):
return build_endpoint_v4(f"[{ip}]")
def test_host_regexp_multiple_ptr_v4(started_cluster):
server_ip = cluster.get_instance_ip("clickhouse-server")
client_ip = cluster.get_instance_ip("clickhouse-client")
dns_server_ip = cluster.get_instance_ip(cluster.coredns_host)
setup_dns_server(client_ip)
setup_ch_server(dns_server_ip)
current_dir = os.path.dirname(__file__)
client.copy_file_to_container(
os.path.join(current_dir, "scripts", "stress_test.py"), "stress_test.py"
)
client.exec_in_container(["python3", f"stress_test.py", client_ip, server_ip])

View File

@ -13,10 +13,14 @@
<values>
<value>protocol</value>
<value>domain</value>
<value>domainRFC</value>
<value>domainWithoutWWW</value>
<value>domainWithoutWWWRFC</value>
<value>topLevelDomain</value>
<value>firstSignificantSubdomain</value>
<value>firstSignificantSubdomainRFC</value>
<value>cutToFirstSignificantSubdomain</value>
<value>cutToFirstSignificantSubdomainRFC</value>
<value>path</value>
<value>pathFull</value>
<value>queryString</value>

View File

@ -124,8 +124,25 @@ example.com
example.com
com
example.com
example.com
example.com
example.com
example.com
example.com
example.com
example.com
example.com
com
====CUT TO FIRST SIGNIFICANT SUBDOMAIN WITH WWW====
www.com
example.com
example.com
example.com
example.com
www.com
example.com
example.com

View File

@ -7,42 +7,28 @@ SELECT protocol('http://127.0.0.1:443/') AS Scheme;
SELECT protocol('//127.0.0.1:443/') AS Scheme;
SELECT '====HOST====';
SELECT domain('http://paul@www.example.com:80/') AS Host;
SELECT domain('user:password@example.com:8080') AS Host;
SELECT domain('http://user:password@example.com:8080') AS Host;
SELECT domain('http://user:password@example.com:8080/path?query=value#fragment') AS Host;
SELECT domain('newuser:@example.com') AS Host;
SELECT domain('http://:pass@example.com') AS Host;
SELECT domain(':newpass@example.com') AS Host;
SELECT domain('http://user:pass@example@.com') AS Host;
SELECT domain('http://user:pass:example.com') AS Host;
SELECT domain('http:/paul/example/com') AS Host;
SELECT domain('http://www.example.com?q=4') AS Host;
SELECT domain('http://127.0.0.1:443/') AS Host;
SELECT domain('//www.example.com') AS Host;
SELECT domain('//paul@www.example.com') AS Host;
SELECT domain('www.example.com') as Host;
SELECT domain('example.com') as Host;
SELECT domainWithoutWWW('//paul@www.example.com') AS Host;
SELECT domainWithoutWWW('http://paul@www.example.com:80/') AS Host;
SELECT domainRFC('http://paul@www.example.com:80/') AS Host;
SELECT domainRFC('user:password@example.com:8080') AS Host;
SELECT domainRFC('http://user:password@example.com:8080') AS Host;
SELECT domainRFC('http://user:password@example.com:8080/path?query=value#fragment') AS Host;
SELECT domainRFC('newuser:@example.com') AS Host;
SELECT domainRFC('http://:pass@example.com') AS Host;
SELECT domainRFC(':newpass@example.com') AS Host;
SELECT domainRFC('http://user:pass@example@.com') AS Host;
SELECT domainRFC('http://user:pass:example.com') AS Host;
SELECT domainRFC('http:/paul/example/com') AS Host;
SELECT domainRFC('http://www.example.com?q=4') AS Host;
SELECT domainRFC('http://127.0.0.1:443/') AS Host;
SELECT domainRFC('//www.example.com') AS Host;
SELECT domainRFC('//paul@www.example.com') AS Host;
SELECT domainRFC('www.example.com') as Host;
SELECT domainRFC('example.com') as Host;
SELECT domainWithoutWWWRFC('//paul@www.example.com') AS Host;
SELECT domainWithoutWWWRFC('http://paul@www.example.com:80/') AS Host;
{% for suffix in ['', 'RFC'] -%}
SELECT domain{{ suffix }}('http://paul@www.example.com:80/') AS Host;
SELECT domain{{ suffix }}('user:password@example.com:8080') AS Host;
SELECT domain{{ suffix }}('http://user:password@example.com:8080') AS Host;
SELECT domain{{ suffix }}('http://user:password@example.com:8080/path?query=value#fragment') AS Host;
SELECT domain{{ suffix }}('newuser:@example.com') AS Host;
SELECT domain{{ suffix }}('http://:pass@example.com') AS Host;
SELECT domain{{ suffix }}(':newpass@example.com') AS Host;
SELECT domain{{ suffix }}('http://user:pass@example@.com') AS Host;
SELECT domain{{ suffix }}('http://user:pass:example.com') AS Host;
SELECT domain{{ suffix }}('http:/paul/example/com') AS Host;
SELECT domain{{ suffix }}('http://www.example.com?q=4') AS Host;
SELECT domain{{ suffix }}('http://127.0.0.1:443/') AS Host;
SELECT domain{{ suffix }}('//www.example.com') AS Host;
SELECT domain{{ suffix }}('//paul@www.example.com') AS Host;
SELECT domain{{ suffix }}('www.example.com') as Host;
SELECT domain{{ suffix }}('example.com') as Host;
SELECT domainWithoutWWW{{ suffix }}('//paul@www.example.com') AS Host;
SELECT domainWithoutWWW{{ suffix }}('http://paul@www.example.com:80/') AS Host;
{% endfor %}
SELECT '====NETLOC====';
SELECT netloc('http://paul@www.example.com:80/') AS Netloc;
@ -121,25 +107,31 @@ SELECT decodeURLComponent(encodeURLComponent('http://paul@127.0.0.1/?query=hello
SELECT decodeURLFormComponent(encodeURLFormComponent('http://paul@127.0.0.1/?query=hello world foo+bar#a=b'));
SELECT '====CUT TO FIRST SIGNIFICANT SUBDOMAIN====';
SELECT cutToFirstSignificantSubdomain('http://www.example.com');
SELECT cutToFirstSignificantSubdomain('http://www.example.com:1234');
SELECT cutToFirstSignificantSubdomain('http://www.example.com/a/b/c');
SELECT cutToFirstSignificantSubdomain('http://www.example.com/a/b/c?a=b');
SELECT cutToFirstSignificantSubdomain('http://www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain('http://paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain('//paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain('www.example.com');
SELECT cutToFirstSignificantSubdomain('example.com');
SELECT cutToFirstSignificantSubdomain('www.com');
SELECT cutToFirstSignificantSubdomain('com');
{% for suffix in ['', 'RFC'] -%}
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://www.example.com');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://www.example.com:1234');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://www.example.com/a/b/c');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://www.example.com/a/b/c?a=b');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('http://paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('//paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('www.example.com');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('example.com');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('www.com');
SELECT cutToFirstSignificantSubdomain{{ suffix }}('com');
{% endfor %}
SELECT '====CUT TO FIRST SIGNIFICANT SUBDOMAIN WITH WWW====';
SELECT cutToFirstSignificantSubdomainWithWWW('http://com');
SELECT cutToFirstSignificantSubdomainWithWWW('http://www.com');
SELECT cutToFirstSignificantSubdomainWithWWW('http://www.example.com');
SELECT cutToFirstSignificantSubdomainWithWWW('http://www.foo.example.com');
SELECT cutToFirstSignificantSubdomainWithWWW('http://www.example.com:1');
SELECT cutToFirstSignificantSubdomainWithWWW('http://www.example.com/');
{% for suffix in ['', 'RFC'] -%}
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://com');
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://www.com');
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://www.example.com');
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://www.foo.example.com');
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://www.example.com:1');
SELECT cutToFirstSignificantSubdomainWithWWW{{ suffix }}('http://www.example.com/');
{% endfor %}
SELECT '====CUT WWW====';
SELECT cutWWW('http://www.example.com');

View File

@ -22,3 +22,27 @@ ipv6
0
host-no-dot
0
ipv4
0
80
80
80
80
hostname
0
80
80
80
80
default-port
80
80
ipv6
0
0
0
0
0
0
host-no-dot
0

View File

@ -1,34 +0,0 @@
select 'ipv4';
select port('http://127.0.0.1/');
select port('http://127.0.0.1:80');
select port('http://127.0.0.1:80/');
select port('//127.0.0.1:80/');
select port('127.0.0.1:80');
select 'hostname';
select port('http://foobar.com/');
select port('http://foobar.com:80');
select port('http://foobar.com:80/');
select port('//foobar.com:80/');
select port('foobar.com:80');
select 'default-port';
select port('http://127.0.0.1/', toUInt16(80));
select port('http://foobar.com/', toUInt16(80));
-- unsupported
/* ILLEGAL_TYPE_OF_ARGUMENT */ select port(toFixedString('', 1)); -- { serverError 43; }
/* ILLEGAL_TYPE_OF_ARGUMENT */ select port('', 1); -- { serverError 43; }
/* NUMBER_OF_ARGUMENTS_DOESNT_MATCH */ select port('', 1, 1); -- { serverError 42; }
--
-- Known limitations of domain() (getURLHost())
--
select 'ipv6';
select port('http://[2001:db8::8a2e:370:7334]/');
select port('http://[2001:db8::8a2e:370:7334]:80');
select port('http://[2001:db8::8a2e:370:7334]:80/');
select port('//[2001:db8::8a2e:370:7334]:80/');
select port('[2001:db8::8a2e:370:7334]:80');
select port('2001:db8::8a2e:370:7334:80');
select 'host-no-dot';
select port('//foobar:80/');

View File

@ -0,0 +1,39 @@
{% for suffix in ['', 'RFC'] -%}
select 'ipv4';
select port{{ suffix }}('http://127.0.0.1/');
select port{{ suffix }}('http://127.0.0.1:80');
select port{{ suffix }}('http://127.0.0.1:80/');
select port{{ suffix }}('//127.0.0.1:80/');
select port{{ suffix }}('127.0.0.1:80');
select 'hostname';
select port{{ suffix }}('http://foobar.com/');
select port{{ suffix }}('http://foobar.com:80');
select port{{ suffix }}('http://foobar.com:80/');
select port{{ suffix }}('//foobar.com:80/');
select port{{ suffix }}('foobar.com:80');
select 'default-port';
select port{{ suffix }}('http://127.0.0.1/', toUInt16(80));
select port{{ suffix }}('http://foobar.com/', toUInt16(80));
-- unsupported
/* ILLEGAL_TYPE_OF_ARGUMENT */ select port(toFixedString('', 1)); -- { serverError 43; }
/* ILLEGAL_TYPE_OF_ARGUMENT */ select port{{ suffix }}('', 1); -- { serverError 43; }
/* NUMBER_OF_ARGUMENTS_DOESNT_MATCH */ select port{{ suffix }}('', 1, 1); -- { serverError 42; }
--
-- Known limitations of domain() (getURLHost())
--
select 'ipv6';
select port{{ suffix }}('http://[2001:db8::8a2e:370:7334]/');
select port{{ suffix }}('http://[2001:db8::8a2e:370:7334]:80');
select port{{ suffix }}('http://[2001:db8::8a2e:370:7334]:80/');
select port{{ suffix }}('//[2001:db8::8a2e:370:7334]:80/');
select port{{ suffix }}('[2001:db8::8a2e:370:7334]:80');
select port{{ suffix }}('2001:db8::8a2e:370:7334:80');
select 'host-no-dot';
select port{{ suffix }}('//foobar:80/');
{%- endfor %}

View File

@ -89,3 +89,92 @@ select cutToFirstSignificantSubdomainCustom('city.kawasaki.jp', 'public_suffix_l
city.kawasaki.jp
select cutToFirstSignificantSubdomainCustom('some.city.kawasaki.jp', 'public_suffix_list');
city.kawasaki.jp
select '-- no-tld';
-- no-tld
-- even if there is no TLD, 2-nd level by default anyway
-- FIXME: make this behavior optional (so that TLD for host never changed, either empty or something real)
select cutToFirstSignificantSubdomainRFC('there-is-no-such-domain');
select cutToFirstSignificantSubdomainRFC('foo.there-is-no-such-domain');
foo.there-is-no-such-domain
select cutToFirstSignificantSubdomainRFC('bar.foo.there-is-no-such-domain');
foo.there-is-no-such-domain
select cutToFirstSignificantSubdomainCustomRFC('there-is-no-such-domain', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustomRFC('foo.there-is-no-such-domain', 'public_suffix_list');
foo.there-is-no-such-domain
select cutToFirstSignificantSubdomainCustomRFC('bar.foo.there-is-no-such-domain', 'public_suffix_list');
foo.there-is-no-such-domain
select firstSignificantSubdomainCustomRFC('bar.foo.there-is-no-such-domain', 'public_suffix_list');
foo
select '-- generic';
-- generic
select firstSignificantSubdomainCustomRFC('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel
kernel
select cutToFirstSignificantSubdomainCustomRFC('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
kernel.biz.ss
select '-- difference';
-- difference
-- biz.ss is not in the default TLD list, hence:
select cutToFirstSignificantSubdomainRFC('foo.kernel.biz.ss'); -- biz.ss
biz.ss
select cutToFirstSignificantSubdomainCustomRFC('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
kernel.biz.ss
select '-- 3+level';
-- 3+level
select cutToFirstSignificantSubdomainCustomRFC('xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
xx.blogspot.co.at
select firstSignificantSubdomainCustomRFC('xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
blogspot
select cutToFirstSignificantSubdomainCustomRFC('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
xx.blogspot.co.at
select firstSignificantSubdomainCustomRFC('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
blogspot
select '-- url';
-- url
select cutToFirstSignificantSubdomainCustomRFC('http://foobar.com', 'public_suffix_list');
foobar.com
select cutToFirstSignificantSubdomainCustomRFC('http://foobar.com/foo', 'public_suffix_list');
foobar.com
select cutToFirstSignificantSubdomainCustomRFC('http://bar.foobar.com/foo', 'public_suffix_list');
foobar.com
select cutToFirstSignificantSubdomainCustomRFC('http://xx.blogspot.co.at', 'public_suffix_list');
xx.blogspot.co.at
select '-- www';
-- www
select cutToFirstSignificantSubdomainCustomWithWWWRFC('http://www.foo', 'public_suffix_list');
www.foo
select cutToFirstSignificantSubdomainCustomRFC('http://www.foo', 'public_suffix_list');
foo
select '-- vector';
-- vector
select cutToFirstSignificantSubdomainCustomRFC('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1);
xx.blogspot.co.at
select cutToFirstSignificantSubdomainCustomRFC('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1);
select '-- no new line';
-- no new line
select cutToFirstSignificantSubdomainCustomRFC('foo.bar', 'no_new_line_list');
foo.bar
select cutToFirstSignificantSubdomainCustomRFC('a.foo.bar', 'no_new_line_list');
a.foo.bar
select cutToFirstSignificantSubdomainCustomRFC('a.foo.baz', 'no_new_line_list');
foo.baz
select '-- asterisk';
-- asterisk
select cutToFirstSignificantSubdomainCustomRFC('foo.something.sheffield.sch.uk', 'public_suffix_list');
something.sheffield.sch.uk
select cutToFirstSignificantSubdomainCustomRFC('something.sheffield.sch.uk', 'public_suffix_list');
something.sheffield.sch.uk
select cutToFirstSignificantSubdomainCustomRFC('sheffield.sch.uk', 'public_suffix_list');
sheffield.sch.uk
select '-- exclamation mark';
-- exclamation mark
select cutToFirstSignificantSubdomainCustomRFC('foo.kawasaki.jp', 'public_suffix_list');
foo.kawasaki.jp
select cutToFirstSignificantSubdomainCustomRFC('foo.foo.kawasaki.jp', 'public_suffix_list');
foo.foo.kawasaki.jp
select cutToFirstSignificantSubdomainCustomRFC('city.kawasaki.jp', 'public_suffix_list');
city.kawasaki.jp
select cutToFirstSignificantSubdomainCustomRFC('some.city.kawasaki.jp', 'public_suffix_list');
city.kawasaki.jp

View File

@ -1,57 +0,0 @@
-- { echo }
select '-- no-tld';
-- even if there is no TLD, 2-nd level by default anyway
-- FIXME: make this behavior optional (so that TLD for host never changed, either empty or something real)
select cutToFirstSignificantSubdomain('there-is-no-such-domain');
select cutToFirstSignificantSubdomain('foo.there-is-no-such-domain');
select cutToFirstSignificantSubdomain('bar.foo.there-is-no-such-domain');
select cutToFirstSignificantSubdomainCustom('there-is-no-such-domain', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('foo.there-is-no-such-domain', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('bar.foo.there-is-no-such-domain', 'public_suffix_list');
select firstSignificantSubdomainCustom('bar.foo.there-is-no-such-domain', 'public_suffix_list');
select '-- generic';
select firstSignificantSubdomainCustom('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel
select cutToFirstSignificantSubdomainCustom('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
select '-- difference';
-- biz.ss is not in the default TLD list, hence:
select cutToFirstSignificantSubdomain('foo.kernel.biz.ss'); -- biz.ss
select cutToFirstSignificantSubdomainCustom('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
select '-- 3+level';
select cutToFirstSignificantSubdomainCustom('xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
select firstSignificantSubdomainCustom('xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
select cutToFirstSignificantSubdomainCustom('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
select firstSignificantSubdomainCustom('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
select '-- url';
select cutToFirstSignificantSubdomainCustom('http://foobar.com', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('http://foobar.com/foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('http://bar.foobar.com/foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('http://xx.blogspot.co.at', 'public_suffix_list');
select '-- www';
select cutToFirstSignificantSubdomainCustomWithWWW('http://www.foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('http://www.foo', 'public_suffix_list');
select '-- vector';
select cutToFirstSignificantSubdomainCustom('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1);
select cutToFirstSignificantSubdomainCustom('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1);
select '-- no new line';
select cutToFirstSignificantSubdomainCustom('foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom('a.foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom('a.foo.baz', 'no_new_line_list');
select '-- asterisk';
select cutToFirstSignificantSubdomainCustom('foo.something.sheffield.sch.uk', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('something.sheffield.sch.uk', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('sheffield.sch.uk', 'public_suffix_list');
select '-- exclamation mark';
select cutToFirstSignificantSubdomainCustom('foo.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('foo.foo.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('city.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom('some.city.kawasaki.jp', 'public_suffix_list');

View File

@ -0,0 +1,61 @@
-- { echo }
{% for suffix in ['', 'RFC'] -%}
select '-- no-tld';
-- even if there is no TLD, 2-nd level by default anyway
-- FIXME: make this behavior optional (so that TLD for host never changed, either empty or something real)
select cutToFirstSignificantSubdomain{{ suffix }}('there-is-no-such-domain');
select cutToFirstSignificantSubdomain{{ suffix }}('foo.there-is-no-such-domain');
select cutToFirstSignificantSubdomain{{ suffix }}('bar.foo.there-is-no-such-domain');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('there-is-no-such-domain', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.there-is-no-such-domain', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('bar.foo.there-is-no-such-domain', 'public_suffix_list');
select firstSignificantSubdomainCustom{{ suffix }}('bar.foo.there-is-no-such-domain', 'public_suffix_list');
select '-- generic';
select firstSignificantSubdomainCustom{{ suffix }}('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
select '-- difference';
-- biz.ss is not in the default TLD list, hence:
select cutToFirstSignificantSubdomain{{ suffix }}('foo.kernel.biz.ss'); -- biz.ss
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.kernel.biz.ss', 'public_suffix_list'); -- kernel.biz.ss
select '-- 3+level';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
select firstSignificantSubdomainCustom{{ suffix }}('xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- xx.blogspot.co.at
select firstSignificantSubdomainCustom{{ suffix }}('foo.bar.xx.blogspot.co.at', 'public_suffix_list'); -- blogspot
select '-- url';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://foobar.com', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://foobar.com/foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://bar.foobar.com/foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://xx.blogspot.co.at', 'public_suffix_list');
select '-- www';
select cutToFirstSignificantSubdomainCustomWithWWW{{ suffix }}('http://www.foo', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://www.foo', 'public_suffix_list');
select '-- vector';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('http://xx.blogspot.co.at/' || toString(number), 'public_suffix_list') from numbers(1);
select cutToFirstSignificantSubdomainCustom{{ suffix }}('there-is-no-such-domain' || toString(number), 'public_suffix_list') from numbers(1);
select '-- no new line';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('a.foo.bar', 'no_new_line_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('a.foo.baz', 'no_new_line_list');
select '-- asterisk';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.something.sheffield.sch.uk', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('something.sheffield.sch.uk', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('sheffield.sch.uk', 'public_suffix_list');
select '-- exclamation mark';
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('foo.foo.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('city.kawasaki.jp', 'public_suffix_list');
select cutToFirstSignificantSubdomainCustom{{ suffix }}('some.city.kawasaki.jp', 'public_suffix_list');
{% endfor %}

View File

@ -62,6 +62,11 @@ SHOW CREATE TABLE 01902_db.t_merge_1;
SELECT 'SELECT _database, _table, n FROM merge(currentDatabase(), ^t) ORDER BY _database, _table, n';
SELECT _database, _table, n FROM merge(currentDatabase(), '^t') ORDER BY _database, _table, n;
--fuzzed LOGICAL_ERROR
CREATE TABLE 01902_db.t4 (n Date) ENGINE=MergeTree ORDER BY n;
INSERT INTO 01902_db.t4 SELECT * FROM numbers(10);
SELECT NULL FROM 01902_db.t_merge WHERE n ORDER BY _table DESC; -- {serverError ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER}
DROP DATABASE 01902_db;
DROP DATABASE 01902_db1;
DROP DATABASE 01902_db2;

View File

@ -219,14 +219,6 @@ cutFragment
cutIPv6
cutQueryString
cutQueryStringAndFragment
cutToFirstSignificantSubdomain
cutToFirstSignificantSubdomainCustom
cutToFirstSignificantSubdomainCustomRFC
cutToFirstSignificantSubdomainCustomWithWWW
cutToFirstSignificantSubdomainCustomWithWWWRFC
cutToFirstSignificantSubdomainRFC
cutToFirstSignificantSubdomainWithWWW
cutToFirstSignificantSubdomainWithWWWRFC
cutURLParameter
cutWWW
dateDiff
@ -284,10 +276,6 @@ dictGetUUIDOrDefault
dictHas
dictIsIn
divide
domain
domainRFC
domainWithoutWWW
domainWithoutWWWRFC
dotProduct
dumpColumnStructure
e
@ -336,10 +324,8 @@ filesystemAvailable
filesystemCapacity
filesystemFree
finalizeAggregation
firstSignificantSubdomain
firstSignificantSubdomainCustom
firstSignificantSubdomainCustomRFC
firstSignificantSubdomainRFC
flattenTuple
floor
format
@ -600,8 +586,6 @@ polygonsUnionCartesian
polygonsUnionSpherical
polygonsWithinCartesian
polygonsWithinSpherical
port
portRFC
position
positionCaseInsensitive
positionCaseInsensitiveUTF8
@ -906,8 +890,6 @@ toYear
toYearWeek
today
tokens
topLevelDomain
topLevelDomainRFC
transactionID
transactionLatestSnapshot
transactionOldestSnapshot

View File

@ -0,0 +1,24 @@
DROP TABLE IF EXISTS prewhere_int128;
DROP TABLE IF EXISTS prewhere_int256;
DROP TABLE IF EXISTS prewhere_uint128;
DROP TABLE IF EXISTS prewhere_uint256;
CREATE TABLE prewhere_int128 (a Int128) ENGINE=MergeTree ORDER BY a;
INSERT INTO prewhere_int128 VALUES (1);
SELECT a FROM prewhere_int128 PREWHERE a; -- { serverError 59 }
DROP TABLE prewhere_int128;
CREATE TABLE prewhere_int256 (a Int256) ENGINE=MergeTree ORDER BY a;
INSERT INTO prewhere_int256 VALUES (1);
SELECT a FROM prewhere_int256 PREWHERE a; -- { serverError 59 }
DROP TABLE prewhere_int256;
CREATE TABLE prewhere_uint128 (a UInt128) ENGINE=MergeTree ORDER BY a;
INSERT INTO prewhere_uint128 VALUES (1);
SELECT a FROM prewhere_uint128 PREWHERE a; -- { serverError 59 }
DROP TABLE prewhere_uint128;
CREATE TABLE prewhere_uint256 (a UInt256) ENGINE=MergeTree ORDER BY a;
INSERT INTO prewhere_uint256 VALUES (1);
SELECT a FROM prewhere_uint256 PREWHERE a; -- { serverError 59 }
DROP TABLE prewhere_uint256;

View File

@ -3,6 +3,7 @@ v22.9.4.32-stable 2022-10-26
v22.9.3.18-stable 2022-09-30
v22.9.2.7-stable 2022-09-23
v22.9.1.2603-stable 2022-09-22
v22.8.8.3-lts 2022-10-27
v22.8.7.34-lts 2022-10-26
v22.8.6.71-lts 2022-09-30
v22.8.5.29-lts 2022-09-13

1 v22.10.1.1877-stable 2022-10-26
3 v22.9.3.18-stable 2022-09-30
4 v22.9.2.7-stable 2022-09-23
5 v22.9.1.2603-stable 2022-09-22
6 v22.8.8.3-lts 2022-10-27
7 v22.8.7.34-lts 2022-10-26
8 v22.8.6.71-lts 2022-09-30
9 v22.8.5.29-lts 2022-09-13