mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge branch 'master' into test_multiple_nodes
This commit is contained in:
commit
24fc120076
@ -1,45 +1,28 @@
|
||||
// https://stackoverflow.com/questions/1413445/reading-a-password-from-stdcin
|
||||
|
||||
#include <common/setTerminalEcho.h>
|
||||
#include <common/errnoToString.h>
|
||||
#include <stdexcept>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
|
||||
#ifdef WIN32
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <termios.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#endif
|
||||
|
||||
|
||||
void setTerminalEcho(bool enable)
|
||||
{
|
||||
#ifdef WIN32
|
||||
auto handle = GetStdHandle(STD_INPUT_HANDLE);
|
||||
DWORD mode;
|
||||
if (!GetConsoleMode(handle, &mode))
|
||||
throw std::runtime_error(std::string("setTerminalEcho failed get: ") + std::to_string(GetLastError()));
|
||||
/// Obtain terminal attributes,
|
||||
/// toggle the ECHO flag
|
||||
/// and set them back.
|
||||
|
||||
if (!enable)
|
||||
mode &= ~ENABLE_ECHO_INPUT;
|
||||
else
|
||||
mode |= ENABLE_ECHO_INPUT;
|
||||
struct termios tty{};
|
||||
|
||||
if (!SetConsoleMode(handle, mode))
|
||||
throw std::runtime_error(std::string("setTerminalEcho failed set: ") + std::to_string(GetLastError()));
|
||||
#else
|
||||
struct termios tty;
|
||||
if (tcgetattr(STDIN_FILENO, &tty))
|
||||
if (0 != tcgetattr(STDIN_FILENO, &tty))
|
||||
throw std::runtime_error(std::string("setTerminalEcho failed get: ") + errnoToString(errno));
|
||||
if (!enable)
|
||||
tty.c_lflag &= ~ECHO;
|
||||
else
|
||||
tty.c_lflag |= ECHO;
|
||||
|
||||
auto ret = tcsetattr(STDIN_FILENO, TCSANOW, &tty);
|
||||
if (ret)
|
||||
if (enable)
|
||||
tty.c_lflag |= ECHO;
|
||||
else
|
||||
tty.c_lflag &= ~ECHO;
|
||||
|
||||
if (0 != tcsetattr(STDIN_FILENO, TCSANOW, &tty))
|
||||
throw std::runtime_error(std::string("setTerminalEcho failed set: ") + errnoToString(errno));
|
||||
#endif
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -243,7 +243,7 @@ The function works according to the algorithm:
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
|
||||
windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
|
||||
```
|
||||
|
||||
**Arguments**
|
||||
@ -254,8 +254,10 @@ windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
|
||||
**Parameters**
|
||||
|
||||
- `window` — Length of the sliding window. The unit of `window` depends on the `timestamp` itself and varies. Determined using the expression `timestamp of cond2 <= timestamp of cond1 + window`.
|
||||
- `mode` — It is an optional argument.
|
||||
- `'strict'` — When the `'strict'` is set, the windowFunnel() applies conditions only for the unique values.
|
||||
- `mode` — It is an optional argument. One or more modes can be set.
|
||||
- `'strict'` — If same condition holds for sequence of events then such non-unique events would be skipped.
|
||||
- `'strict_order'` — Don't allow interventions of other events. E.g. in the case of `A->B->D->C`, it stops finding `A->B->C` at the `D` and the max event level is 2.
|
||||
- `'strict_increase'` — Apply conditions only to events with strictly increasing timestamps.
|
||||
|
||||
**Returned value**
|
||||
|
||||
|
@ -12,7 +12,9 @@ The search is case-sensitive by default in all these functions. There are separa
|
||||
|
||||
## position(haystack, needle), locate(haystack, needle) {#position}
|
||||
|
||||
Returns the position (in bytes) of the found substring in the string, starting from 1.
|
||||
Searches for the substring `needle` in the string `haystack`.
|
||||
|
||||
Returns the position (in bytes) of the found substring in the string, starting from 1.
|
||||
|
||||
For a case-insensitive search, use the function [positionCaseInsensitive](#positioncaseinsensitive).
|
||||
|
||||
@ -20,15 +22,22 @@ For a case-insensitive search, use the function [positionCaseInsensitive](#posit
|
||||
|
||||
``` sql
|
||||
position(haystack, needle[, start_pos])
|
||||
```
|
||||
```
|
||||
|
||||
``` sql
|
||||
position(needle IN haystack)
|
||||
```
|
||||
|
||||
Alias: `locate(haystack, needle[, start_pos])`.
|
||||
|
||||
!!! note "Note"
|
||||
Syntax of `position(needle IN haystack)` provides SQL-compatibility, the function works the same way as to `position(haystack, needle)`.
|
||||
|
||||
**Arguments**
|
||||
|
||||
- `haystack` — String, in which substring will to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
|
||||
- `needle` — Substring to be searched. [String](../../sql-reference/syntax.md#syntax-string-literal).
|
||||
- `start_pos` — Optional parameter, position of the first character in the string to start search. [UInt](../../sql-reference/data-types/int-uint.md).
|
||||
- `start_pos` – Position of the first character in the string to start search. [UInt](../../sql-reference/data-types/int-uint.md). Optional.
|
||||
|
||||
**Returned values**
|
||||
|
||||
@ -83,6 +92,36 @@ Result:
|
||||
└───────────────────────────────┘
|
||||
```
|
||||
|
||||
**Examples for POSITION(needle IN haystack) syntax**
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT 3 = position('c' IN 'abc');
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─equals(3, position('abc', 'c'))─┐
|
||||
│ 1 │
|
||||
└─────────────────────────────────┘
|
||||
```
|
||||
|
||||
Query:
|
||||
|
||||
```sql
|
||||
SELECT 6 = position('/' IN s) FROM (SELECT 'Hello/World' AS s);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
```text
|
||||
┌─equals(6, position(s, '/'))─┐
|
||||
│ 1 │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
||||
## positionCaseInsensitive {#positioncaseinsensitive}
|
||||
|
||||
The same as [position](#position) returns the position (in bytes) of the found substring in the string, starting from 1. Use the function for a case-insensitive search.
|
||||
@ -772,4 +811,3 @@ Result:
|
||||
│ 2 │
|
||||
└───────────────────────────────┘
|
||||
```
|
||||
|
||||
|
@ -243,7 +243,7 @@ SELECT sequenceCount('(?1).*(?2)')(time, number = 1, number = 2) FROM t
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
|
||||
windowFunnel(window, [mode, [mode, ... ]])(timestamp, cond1, cond2, ..., condN)
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
@ -254,7 +254,10 @@ windowFunnel(window, [mode])(timestamp, cond1, cond2, ..., condN)
|
||||
**Параметры**
|
||||
|
||||
- `window` — ширина скользящего окна по времени. Единица измерения зависит от `timestamp` и может варьироваться. Должно соблюдаться условие `timestamp события cond2 <= timestamp события cond1 + window`.
|
||||
- `mode` — необязательный параметр. Если установлено значение `'strict'`, то функция `windowFunnel()` применяет условия только для уникальных значений.
|
||||
- `mode` — необязательный параметр. Может быть установленно несколько значений одновременно.
|
||||
- `'strict'` — не учитывать подряд идущие повторяющиеся события.
|
||||
- `'strict_order'` — запрещает посторонние события в искомой последовательности. Например, при поиске цепочки `A->B->C` в `A->B->D->C` поиск будет остановлен на `D` и функция вернет 2.
|
||||
- `'strict_increase'` — условия прменяются только для событий со строго возрастающими временными метками.
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
|
@ -7,7 +7,7 @@ toc_title: "Функции поиска в строках"
|
||||
|
||||
Во всех функциях, поиск регистрозависимый по умолчанию. Существуют варианты функций для регистронезависимого поиска.
|
||||
|
||||
## position(haystack, needle) {#position}
|
||||
## position(haystack, needle), locate(haystack, needle) {#position}
|
||||
|
||||
Поиск подстроки `needle` в строке `haystack`.
|
||||
|
||||
@ -21,8 +21,15 @@ toc_title: "Функции поиска в строках"
|
||||
position(haystack, needle[, start_pos])
|
||||
```
|
||||
|
||||
``` sql
|
||||
position(needle IN haystack)
|
||||
```
|
||||
|
||||
Алиас: `locate(haystack, needle[, start_pos])`.
|
||||
|
||||
!!! note "Примечание"
|
||||
Синтаксис `position(needle IN haystack)` обеспечивает совместимость с SQL, функция работает так же, как `position(haystack, needle)`.
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `haystack` — строка, по которой выполняется поиск. [Строка](../syntax.md#syntax-string-literal).
|
||||
@ -70,6 +77,36 @@ SELECT position('Привет, мир!', '!');
|
||||
└───────────────────────────────┘
|
||||
```
|
||||
|
||||
**Примеры работы функции с синтаксисом POSITION(needle IN haystack)**
|
||||
|
||||
Запрос:
|
||||
|
||||
```sql
|
||||
SELECT 1 = position('абв' IN 'абв');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
```text
|
||||
┌─equals(1, position('абв', 'абв'))─┐
|
||||
│ 1 │
|
||||
└───────────────────────────────────┘
|
||||
```
|
||||
|
||||
Запрос:
|
||||
|
||||
```sql
|
||||
SELECT 0 = position('абв' IN '');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
```text
|
||||
┌─equals(0, position('', 'абв'))─┐
|
||||
│ 1 │
|
||||
└────────────────────────────────┘
|
||||
```
|
||||
|
||||
## positionCaseInsensitive {#positioncaseinsensitive}
|
||||
|
||||
Такая же, как и [position](#position), но работает без учета регистра. Возвращает позицию в байтах найденной подстроки в строке, начиная с 1.
|
||||
@ -758,4 +795,3 @@ SELECT countSubstringsCaseInsensitiveUTF8('аБв__АбВ__абв', 'Абв');
|
||||
│ 3 │
|
||||
└────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
|
@ -23,7 +23,6 @@ nltk==3.5
|
||||
nose==1.3.7
|
||||
protobuf==3.14.0
|
||||
numpy==1.19.2
|
||||
Pygments==2.5.2
|
||||
pymdown-extensions==8.0
|
||||
python-slugify==4.0.1
|
||||
PyYAML==5.4.1
|
||||
@ -36,3 +35,4 @@ termcolor==1.1.0
|
||||
tornado==6.1
|
||||
Unidecode==1.1.1
|
||||
urllib3==1.25.10
|
||||
Pygments>=2.7.4
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
|
||||
#include <ext/range.h>
|
||||
#include "registerAggregateFunctions.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -29,6 +29,7 @@ struct ComparePair final
|
||||
};
|
||||
|
||||
static constexpr auto max_events = 32;
|
||||
|
||||
template <typename T>
|
||||
struct AggregateFunctionWindowFunnelData
|
||||
{
|
||||
@ -46,7 +47,7 @@ struct AggregateFunctionWindowFunnelData
|
||||
|
||||
void add(T timestamp, UInt8 event)
|
||||
{
|
||||
// Since most events should have already been sorted by timestamp.
|
||||
/// Since most events should have already been sorted by timestamp.
|
||||
if (sorted && events_list.size() > 0)
|
||||
{
|
||||
if (events_list.back().first == timestamp)
|
||||
@ -145,14 +146,20 @@ class AggregateFunctionWindowFunnel final
|
||||
private:
|
||||
UInt64 window;
|
||||
UInt8 events_size;
|
||||
UInt8 strict; // When the 'strict' is set, it applies conditions only for the not repeating values.
|
||||
UInt8 strict_order; // When the 'strict_order' is set, it doesn't allow interventions of other events.
|
||||
// In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2.
|
||||
/// When the 'strict' is set, it applies conditions only for the not repeating values.
|
||||
bool strict;
|
||||
|
||||
// Loop through the entire events_list, update the event timestamp value
|
||||
// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window.
|
||||
// If found, returns the max event level, else return 0.
|
||||
// The Algorithm complexity is O(n).
|
||||
/// When the 'strict_order' is set, it doesn't allow interventions of other events.
|
||||
/// In the case of 'A->B->D->C', it stops finding 'A->B->C' at the 'D' and the max event level is 2.
|
||||
bool strict_order;
|
||||
|
||||
/// Applies conditions only to events with strictly increasing timestamps
|
||||
bool strict_increase;
|
||||
|
||||
/// Loop through the entire events_list, update the event timestamp value
|
||||
/// The level path must be 1---2---3---...---check_events_size, find the max event level that satisfied the path in the sliding window.
|
||||
/// If found, returns the max event level, else return 0.
|
||||
/// The Algorithm complexity is O(n).
|
||||
UInt8 getEventLevel(Data & data) const
|
||||
{
|
||||
if (data.size() == 0)
|
||||
@ -162,16 +169,13 @@ private:
|
||||
|
||||
data.sort();
|
||||
|
||||
/// events_timestamp stores the timestamp that latest i-th level event happen within time window after previous level event.
|
||||
/// timestamp defaults to -1, which unsigned timestamp value never meet
|
||||
/// there may be some bugs when UInt64 type timstamp overflows Int64, but it works on most cases.
|
||||
std::vector<Int64> events_timestamp(events_size, -1);
|
||||
/// events_timestamp stores the timestamp of the first and previous i-th level event happen within time window
|
||||
std::vector<std::optional<std::pair<UInt64, UInt64>>> events_timestamp(events_size);
|
||||
bool first_event = false;
|
||||
for (const auto & pair : data.events_list)
|
||||
{
|
||||
const T & timestamp = pair.first;
|
||||
const auto & event_idx = pair.second - 1;
|
||||
|
||||
if (strict_order && event_idx == -1)
|
||||
{
|
||||
if (first_event)
|
||||
@ -181,31 +185,39 @@ private:
|
||||
}
|
||||
else if (event_idx == 0)
|
||||
{
|
||||
events_timestamp[0] = timestamp;
|
||||
events_timestamp[0] = std::make_pair(timestamp, timestamp);
|
||||
first_event = true;
|
||||
}
|
||||
else if (strict && events_timestamp[event_idx] >= 0)
|
||||
else if (strict && events_timestamp[event_idx].has_value())
|
||||
{
|
||||
return event_idx + 1;
|
||||
}
|
||||
else if (strict_order && first_event && events_timestamp[event_idx - 1] == -1)
|
||||
else if (strict_order && first_event && !events_timestamp[event_idx - 1].has_value())
|
||||
{
|
||||
for (size_t event = 0; event < events_timestamp.size(); ++event)
|
||||
{
|
||||
if (events_timestamp[event] == -1)
|
||||
if (!events_timestamp[event].has_value())
|
||||
return event;
|
||||
}
|
||||
}
|
||||
else if (events_timestamp[event_idx - 1] >= 0 && timestamp <= events_timestamp[event_idx - 1] + window)
|
||||
else if (events_timestamp[event_idx - 1].has_value())
|
||||
{
|
||||
events_timestamp[event_idx] = events_timestamp[event_idx - 1];
|
||||
if (event_idx + 1 == events_size)
|
||||
return events_size;
|
||||
auto first_timestamp = events_timestamp[event_idx - 1]->first;
|
||||
bool time_matched = timestamp <= first_timestamp + window;
|
||||
if (strict_increase)
|
||||
time_matched = time_matched && events_timestamp[event_idx - 1]->second < timestamp;
|
||||
if (time_matched)
|
||||
{
|
||||
events_timestamp[event_idx] = std::make_pair(first_timestamp, timestamp);
|
||||
if (event_idx + 1 == events_size)
|
||||
return events_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t event = events_timestamp.size(); event > 0; --event)
|
||||
{
|
||||
if (events_timestamp[event - 1] >= 0)
|
||||
if (events_timestamp[event - 1].has_value())
|
||||
return event;
|
||||
}
|
||||
return 0;
|
||||
@ -223,15 +235,18 @@ public:
|
||||
events_size = arguments.size() - 1;
|
||||
window = params.at(0).safeGet<UInt64>();
|
||||
|
||||
strict = 0;
|
||||
strict_order = 0;
|
||||
strict = false;
|
||||
strict_order = false;
|
||||
strict_increase = false;
|
||||
for (size_t i = 1; i < params.size(); ++i)
|
||||
{
|
||||
String option = params.at(i).safeGet<String>();
|
||||
if (option.compare("strict") == 0)
|
||||
strict = 1;
|
||||
else if (option.compare("strict_order") == 0)
|
||||
strict_order = 1;
|
||||
if (option == "strict")
|
||||
strict = true;
|
||||
else if (option == "strict_order")
|
||||
strict_order = true;
|
||||
else if (option == "strict_increase")
|
||||
strict_increase = true;
|
||||
else
|
||||
throw Exception{"Aggregate function " + getName() + " doesn't support a parameter: " + option, ErrorCodes::BAD_ARGUMENTS};
|
||||
}
|
||||
@ -253,7 +268,7 @@ public:
|
||||
{
|
||||
bool has_event = false;
|
||||
const auto timestamp = assert_cast<const ColumnVector<T> *>(columns[0])->getData()[row_num];
|
||||
// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
|
||||
/// reverse iteration and stable sorting are needed for events that are qualified by more than one condition.
|
||||
for (auto i = events_size; i > 0; --i)
|
||||
{
|
||||
auto event = assert_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <Common/NaNUtils.h>
|
||||
#include <Poco/Exception.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
@ -162,6 +163,11 @@ public:
|
||||
sorted = false;
|
||||
}
|
||||
|
||||
#if !__clang__
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wclass-memaccess"
|
||||
#endif
|
||||
|
||||
void write(DB::WriteBuffer & buf) const
|
||||
{
|
||||
size_t size = samples.size();
|
||||
@ -169,9 +175,26 @@ public:
|
||||
DB::writeIntBinary<size_t>(total_values, buf);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
DB::writePODBinary(samples[i], buf);
|
||||
{
|
||||
/// There was a mistake in this function.
|
||||
/// Instead of correctly serializing the elements,
|
||||
/// it was writing them with uninitialized padding.
|
||||
/// Here we ensure that padding is zero without changing the protocol.
|
||||
/// TODO: After implementation of "versioning aggregate function state",
|
||||
/// change the serialization format.
|
||||
|
||||
Element elem;
|
||||
memset(&elem, 0, sizeof(elem));
|
||||
elem = samples[i];
|
||||
|
||||
DB::writePODBinary(elem, buf);
|
||||
}
|
||||
}
|
||||
|
||||
#if !__clang__
|
||||
#pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
private:
|
||||
/// We allocate some memory on the stack to avoid allocations when there are many objects with a small number of elements.
|
||||
using Element = std::pair<T, UInt32>;
|
||||
|
@ -88,9 +88,9 @@ public:
|
||||
const String & user_, const String & password_,
|
||||
const String & cluster_,
|
||||
const String & cluster_secret_,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Secure secure_ = Protocol::Secure::Disable,
|
||||
const String & client_name_,
|
||||
Protocol::Compression compression_,
|
||||
Protocol::Secure secure_,
|
||||
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
|
||||
:
|
||||
host(host_), port(port_), default_database(default_database_),
|
||||
|
@ -56,9 +56,9 @@ public:
|
||||
const String & password_,
|
||||
const String & cluster_,
|
||||
const String & cluster_secret_,
|
||||
const String & client_name_ = "client",
|
||||
Protocol::Compression compression_ = Protocol::Compression::Enable,
|
||||
Protocol::Secure secure_ = Protocol::Secure::Disable,
|
||||
const String & client_name_,
|
||||
Protocol::Compression compression_,
|
||||
Protocol::Secure secure_,
|
||||
Int64 priority_ = 1)
|
||||
: Base(max_connections_,
|
||||
&Poco::Logger::get("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
|
||||
|
@ -13,6 +13,7 @@
|
||||
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -116,7 +116,7 @@ public:
|
||||
/** Get a list of column names separated by commas. */
|
||||
std::string dumpNames() const;
|
||||
|
||||
/** List of names, types and lengths of columns. Designed for debugging. */
|
||||
/** List of names, types and lengths of columns. Designed for debugging. */
|
||||
std::string dumpStructure() const;
|
||||
|
||||
/** List of column names and positions from index */
|
||||
|
@ -593,6 +593,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
|
||||
if (from.addresses_with_failover.empty())
|
||||
throw Exception("Cluster is empty", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
UInt32 shard_num = 0;
|
||||
std::set<std::pair<String, int>> unique_hosts;
|
||||
for (size_t shard_index : ext::range(0, from.shards_info.size()))
|
||||
{
|
||||
@ -603,6 +604,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
|
||||
continue; /// Duplicate host, skip.
|
||||
|
||||
ShardInfo info;
|
||||
info.shard_num = ++shard_num;
|
||||
|
||||
if (address.is_local)
|
||||
info.local_addresses.push_back(address);
|
||||
|
||||
|
@ -97,6 +97,7 @@ public:
|
||||
Int64 priority = 1;
|
||||
|
||||
Address() = default;
|
||||
|
||||
Address(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix,
|
||||
@ -104,6 +105,7 @@ public:
|
||||
const String & cluster_secret_,
|
||||
UInt32 shard_index_ = 0,
|
||||
UInt32 replica_index_ = 0);
|
||||
|
||||
Address(
|
||||
const String & host_port_,
|
||||
const String & user_,
|
||||
|
@ -83,6 +83,9 @@ struct Settings;
|
||||
M(UInt64, replicated_max_parallel_fetches_for_host, DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT, "Limit parallel fetches from endpoint (actually pool size).", 0) \
|
||||
M(UInt64, replicated_max_parallel_sends, 0, "Limit parallel sends.", 0) \
|
||||
M(UInt64, replicated_max_parallel_sends_for_table, 0, "Limit parallel sends for one table.", 0) \
|
||||
M(Seconds, replicated_fetches_http_connection_timeout, 0, "HTTP connection timeout for part fetch requests. Inherited from default profile `http_connection_timeout` if not set explicitly.", 0) \
|
||||
M(Seconds, replicated_fetches_http_send_timeout, 0, "HTTP send timeout for part fetch requests. Inherited from default profile `http_send_timeout` if not set explicitly.", 0) \
|
||||
M(Seconds, replicated_fetches_http_receive_timeout, 0, "HTTP receive timeout for fetch part requests. Inherited from default profile `http_receive_timeout` if not set explicitly.", 0) \
|
||||
M(Bool, replicated_can_become_leader, true, "If true, Replicated tables replicas on this node will try to acquire leadership.", 0) \
|
||||
M(Seconds, zookeeper_session_expiration_check_period, 60, "ZooKeeper session expiration check period, in seconds.", 0) \
|
||||
M(Bool, detach_old_local_parts_when_cloning_replica, 1, "Do not remove old local parts when repairing lost replica.", 0) \
|
||||
|
@ -39,7 +39,7 @@ MergeTreeWhereOptimizer::MergeTreeWhereOptimizer(
|
||||
, queried_columns{queried_columns_}
|
||||
, sorting_key_names{NameSet(
|
||||
metadata_snapshot->getSortingKey().column_names.begin(), metadata_snapshot->getSortingKey().column_names.end())}
|
||||
, block_with_constants{KeyCondition::getBlockWithConstants(query_info.query, query_info.syntax_analyzer_result, context)}
|
||||
, block_with_constants{KeyCondition::getBlockWithConstants(query_info.query->clone(), query_info.syntax_analyzer_result, context)}
|
||||
, log{log_}
|
||||
, column_sizes{std::move(column_sizes_)}
|
||||
{
|
||||
|
@ -2313,7 +2313,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
{
|
||||
String source_replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
|
||||
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
|
||||
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
|
||||
auto timeouts = getFetchPartHTTPTimeouts(global_context);
|
||||
|
||||
auto [user, password] = global_context.getInterserverCredentials();
|
||||
String interserver_scheme = global_context.getInterserverScheme();
|
||||
|
||||
@ -3246,6 +3247,23 @@ void StorageReplicatedMergeTree::exitLeaderElection()
|
||||
leader_election = nullptr;
|
||||
}
|
||||
|
||||
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(const Context & context)
|
||||
{
|
||||
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context);
|
||||
auto settings = getSettings();
|
||||
|
||||
if (settings->replicated_fetches_http_connection_timeout.changed)
|
||||
timeouts.connection_timeout = settings->replicated_fetches_http_connection_timeout;
|
||||
|
||||
if (settings->replicated_fetches_http_send_timeout.changed)
|
||||
timeouts.send_timeout = settings->replicated_fetches_http_send_timeout;
|
||||
|
||||
if (settings->replicated_fetches_http_receive_timeout.changed)
|
||||
timeouts.receive_timeout = settings->replicated_fetches_http_receive_timeout;
|
||||
|
||||
return timeouts;
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name)
|
||||
{
|
||||
auto zookeeper = getZooKeeper();
|
||||
@ -3661,7 +3679,8 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
else
|
||||
{
|
||||
address.fromString(zookeeper->get(source_replica_path + "/host"));
|
||||
timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
|
||||
timeouts = getFetchPartHTTPTimeouts(global_context);
|
||||
|
||||
user_password = global_context.getInterserverCredentials();
|
||||
interserver_scheme = global_context.getInterserverScheme();
|
||||
|
||||
|
@ -507,6 +507,8 @@ private:
|
||||
|
||||
/// Exchange parts.
|
||||
|
||||
ConnectionTimeouts getFetchPartHTTPTimeouts(const Context & context);
|
||||
|
||||
/** Returns an empty string if no one has a part.
|
||||
*/
|
||||
String findReplicaHavingPart(const String & part_name, bool active);
|
||||
|
@ -16,5 +16,17 @@
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster_two_shards_different_databases>
|
||||
<test_cluster_one_shard_two_replicas>
|
||||
<shard>
|
||||
<replica>
|
||||
<host>127.0.0.1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
<replica>
|
||||
<host>127.0.0.2</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster_one_shard_two_replicas>
|
||||
</remote_servers>
|
||||
</yandex>
|
||||
|
@ -0,0 +1,3 @@
|
||||
<yandex>
|
||||
<background_processing_pool_task_sleep_seconds_when_no_work_max>0.1</background_processing_pool_task_sleep_seconds_when_no_work_max>
|
||||
</yandex>
|
95
tests/integration/test_replicated_fetches_timeouts/test.py
Normal file
95
tests/integration/test_replicated_fetches_timeouts/test.py
Normal file
@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.network import PartitionManager
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
'node1', with_zookeeper=True,
|
||||
main_configs=['configs/server.xml'])
|
||||
|
||||
node2 = cluster.add_instance(
|
||||
'node2', with_zookeeper=True,
|
||||
main_configs=['configs/server.xml'])
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_random_string(length):
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
|
||||
|
||||
|
||||
def test_no_stall(started_cluster):
|
||||
for instance in started_cluster.instances.values():
|
||||
instance.query("""
|
||||
CREATE TABLE t (key UInt64, data String)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '{instance}')
|
||||
ORDER BY tuple()
|
||||
PARTITION BY key""")
|
||||
|
||||
# Pause node3 until the test setup is prepared
|
||||
node2.query("SYSTEM STOP FETCHES t")
|
||||
|
||||
node1.query("INSERT INTO t SELECT 1, '{}' FROM numbers(500)".format(get_random_string(104857)))
|
||||
node1.query("INSERT INTO t SELECT 2, '{}' FROM numbers(500)".format(get_random_string(104857)))
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.add_network_delay(node1, 2000)
|
||||
node2.query("SYSTEM START FETCHES t")
|
||||
|
||||
# Wait for timeout exceptions to confirm that timeout is triggered.
|
||||
while True:
|
||||
conn_timeout_exceptions = int(node2.query(
|
||||
"""
|
||||
SELECT count()
|
||||
FROM system.replication_queue
|
||||
WHERE last_exception LIKE '%connect timed out%'
|
||||
"""))
|
||||
|
||||
if conn_timeout_exceptions >= 2:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
print("Connection timeouts tested!")
|
||||
|
||||
# Increase connection timeout and wait for receive timeouts.
|
||||
node2.query("""
|
||||
ALTER TABLE t
|
||||
MODIFY SETTING replicated_fetches_http_connection_timeout = 30,
|
||||
replicated_fetches_http_receive_timeout = 1""")
|
||||
|
||||
while True:
|
||||
timeout_exceptions = int(node2.query(
|
||||
"""
|
||||
SELECT count()
|
||||
FROM system.replication_queue
|
||||
WHERE last_exception LIKE '%e.displayText() = Timeout%'
|
||||
AND last_exception NOT LIKE '%connect timed out%'
|
||||
""").strip())
|
||||
|
||||
if timeout_exceptions >= 2:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
for instance in started_cluster.instances.values():
|
||||
# Workaround for DROP TABLE not finishing if it is started while table is readonly.
|
||||
instance.query("SYSTEM RESTART REPLICA t")
|
||||
|
||||
# Cleanup data directory from test results archive.
|
||||
instance.query("DROP TABLE t SYNC")
|
@ -1,6 +1,6 @@
|
||||
(ns jepsen.nukeeper-test
|
||||
(ns jepsen.keeper-test
|
||||
(:require [clojure.test :refer :all]
|
||||
[jepsen.nukeeper.utils :refer :all]
|
||||
[jepsen.clickhouse-keeper.utils :refer :all]
|
||||
[zookeeper :as zk]
|
||||
[zookeeper.data :as data])
|
||||
(:import (ch.qos.logback.classic Level)
|
@ -57,3 +57,7 @@
|
||||
[2, 0]
|
||||
[3, 1]
|
||||
[4, 1]
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
|
@ -79,3 +79,11 @@ select u, windowFunnel(86400)(dt, a is null and b is null) as s from funnel_test
|
||||
select u, windowFunnel(86400)(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
select u, windowFunnel(86400, 'strict_order')(dt, a is null, b = 'b3') as s from funnel_test_non_null group by u order by u format JSONCompactEachRow;
|
||||
drop table funnel_test_non_null;
|
||||
|
||||
create table funnel_test_strict_increase (timestamp UInt32, event UInt32) engine=Memory;
|
||||
insert into funnel_test_strict_increase values (0,1000),(1,1001),(1,1002),(1,1003),(2,1004);
|
||||
|
||||
select 5 = windowFunnel(10000)(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
select 2 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1000, event = 1001, event = 1002, event = 1003, event = 1004) from funnel_test_strict_increase;
|
||||
select 3 = windowFunnel(10000)(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
select 1 = windowFunnel(10000, 'strict_increase')(timestamp, event = 1004, event = 1004, event = 1004) from funnel_test_strict_increase;
|
||||
|
@ -0,0 +1,9 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
2
|
||||
1
|
||||
2
|
||||
1
|
||||
1
|
||||
2
|
@ -0,0 +1,8 @@
|
||||
SELECT _shard_num FROM cluster('test_shard_localhost', system.one);
|
||||
SELECT _shard_num FROM clusterAllReplicas('test_shard_localhost', system.one);
|
||||
|
||||
SELECT _shard_num FROM cluster('test_cluster_two_shards', system.one) ORDER BY _shard_num;
|
||||
SELECT _shard_num FROM clusterAllReplicas('test_cluster_two_shards', system.one) ORDER BY _shard_num;
|
||||
|
||||
SELECT _shard_num FROM cluster('test_cluster_one_shard_two_replicas', system.one) ORDER BY _shard_num;
|
||||
SELECT _shard_num FROM clusterAllReplicas('test_cluster_one_shard_two_replicas', system.one) ORDER BY _shard_num;
|
@ -0,0 +1,5 @@
|
||||
DROP TABLE IF EXISTS ttt01778;
|
||||
CREATE TABLE ttt01778 (`1` String, `2` INT) ENGINE = MergeTree() ORDER BY tuple();
|
||||
INSERT INTO ttt01778 values('1',1),('2',2),('3',3);
|
||||
select * from ttt01778 where 1=2; -- no server error
|
||||
DROP TABLE ttt01778;
|
@ -0,0 +1 @@
|
||||
11447494982455782708
|
@ -0,0 +1 @@
|
||||
SELECT cityHash64(toString(quantileDeterministicState(number, sipHash64(number)))) FROM numbers(8193);
|
@ -1,99 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import threading
|
||||
from io import StringIO, SEEK_END
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
|
||||
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
|
||||
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
|
||||
|
||||
# IP-address of this host accessible from outside world.
|
||||
HTTP_SERVER_HOST = os.environ.get('HTTP_SERVER_HOST', subprocess.check_output(['hostname', '-i']).decode('utf-8').strip())
|
||||
HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234))
|
||||
|
||||
# IP address and port of the HTTP server started from this script.
|
||||
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
|
||||
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
|
||||
|
||||
ostream = StringIO()
|
||||
istream = sys.stdout
|
||||
|
||||
class EchoCSVHTTPServer(BaseHTTPRequestHandler):
|
||||
def _set_headers(self):
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'text/plain')
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
self._set_headers()
|
||||
with open(CSV_DATA, 'r') as fl:
|
||||
ostream.seek(0)
|
||||
for row in ostream:
|
||||
self.wfile.write(row + '\n')
|
||||
return
|
||||
|
||||
def read_chunk(self):
|
||||
msg = ''
|
||||
while True:
|
||||
sym = self.rfile.read(1)
|
||||
if sym == '':
|
||||
break
|
||||
msg += sym.decode('utf-8')
|
||||
if msg.endswith('\r\n'):
|
||||
break
|
||||
length = int(msg[:-2], 16)
|
||||
if length == 0:
|
||||
return ''
|
||||
content = self.rfile.read(length)
|
||||
self.rfile.read(2) # read sep \r\n
|
||||
return content.decode('utf-8')
|
||||
|
||||
def do_POST(self):
|
||||
while True:
|
||||
chunk = self.read_chunk()
|
||||
if not chunk:
|
||||
break
|
||||
istream.write(chunk)
|
||||
istream.flush()
|
||||
text = ""
|
||||
self._set_headers()
|
||||
self.wfile.write("ok")
|
||||
|
||||
def log_message(self, format, *args):
|
||||
return
|
||||
|
||||
def start_server(requests_amount, test_data="Hello,2,-2,7.7\nWorld,2,-5,8.8"):
|
||||
ostream = StringIO(test_data.decode("utf-8"))
|
||||
|
||||
httpd = HTTPServer(HTTP_SERVER_ADDRESS, EchoCSVHTTPServer)
|
||||
|
||||
def real_func():
|
||||
for i in range(requests_amount):
|
||||
httpd.handle_request()
|
||||
|
||||
t = threading.Thread(target=real_func)
|
||||
return t
|
||||
|
||||
def run(requests_amount=1):
|
||||
t = start_server(requests_amount)
|
||||
t.start()
|
||||
t.join()
|
||||
|
||||
if __name__ == "__main__":
|
||||
exception_text = ''
|
||||
for i in range(1, 5):
|
||||
try:
|
||||
run(int(sys.argv[1]) if len(sys.argv) > 1 else 1)
|
||||
break
|
||||
except Exception as ex:
|
||||
exception_text = str(ex)
|
||||
time.sleep(1)
|
||||
|
||||
if exception_text:
|
||||
print("Exception: {}".format(exception_text), file=sys.stderr)
|
||||
os._exit(1)
|
||||
|
@ -140,4 +140,4 @@
|
||||
<replace>[hidden]</replace>
|
||||
</rule>
|
||||
</query_masking_rules>
|
||||
</yandex>
|
||||
</yandex>
|
||||
|
@ -21,7 +21,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
|
||||
add_subdirectory (corrector_utf8)
|
||||
add_subdirectory (zookeeper-cli)
|
||||
add_subdirectory (zookeeper-test)
|
||||
add_subdirectory (nukeeper-data-dumper)
|
||||
add_subdirectory (keeper-data-dumper)
|
||||
add_subdirectory (zookeeper-dump-tree)
|
||||
add_subdirectory (zookeeper-remove-by-list)
|
||||
add_subdirectory (zookeeper-create-entry-to-download-part)
|
||||
|
2
utils/keeper-data-dumper/CMakeLists.txt
Normal file
2
utils/keeper-data-dumper/CMakeLists.txt
Normal file
@ -0,0 +1,2 @@
|
||||
add_executable(keeper-data-dumper main.cpp)
|
||||
target_link_libraries(keeper-data-dumper PRIVATE dbms)
|
@ -1,2 +0,0 @@
|
||||
add_executable(nukeeper-data-dumper main.cpp)
|
||||
target_link_libraries(nukeeper-data-dumper PRIVATE dbms)
|
Loading…
Reference in New Issue
Block a user