mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge branch 'master' into gyuton-DOCSUP-11552-Fix-and-update-documentation
This commit is contained in:
commit
9dd9b7d430
@ -42,6 +42,7 @@ namespace
|
||||
} while (false)
|
||||
|
||||
|
||||
#define LOG_TEST(logger, ...) LOG_IMPL(logger, DB::LogsLevel::test, Poco::Message::PRIO_TEST, __VA_ARGS__)
|
||||
#define LOG_TRACE(logger, ...) LOG_IMPL(logger, DB::LogsLevel::trace, Poco::Message::PRIO_TRACE, __VA_ARGS__)
|
||||
#define LOG_DEBUG(logger, ...) LOG_IMPL(logger, DB::LogsLevel::debug, Poco::Message::PRIO_DEBUG, __VA_ARGS__)
|
||||
#define LOG_INFO(logger, ...) LOG_IMPL(logger, DB::LogsLevel::information, Poco::Message::PRIO_INFORMATION, __VA_ARGS__)
|
||||
|
@ -7,10 +7,22 @@
|
||||
#endif
|
||||
|
||||
#include <mysqlxx/Pool.h>
|
||||
|
||||
#include <common/sleep.h>
|
||||
|
||||
#include <Poco/Util/LayeredConfiguration.h>
|
||||
#include <ctime>
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
inline uint64_t clock_gettime_ns(clockid_t clock_type = CLOCK_MONOTONIC)
|
||||
{
|
||||
struct timespec ts;
|
||||
clock_gettime(clock_type, &ts);
|
||||
return uint64_t(ts.tv_sec * 1000000000LL + ts.tv_nsec);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
namespace mysqlxx
|
||||
@ -124,10 +136,15 @@ Pool::~Pool()
|
||||
}
|
||||
|
||||
|
||||
Pool::Entry Pool::get()
|
||||
Pool::Entry Pool::get(uint64_t wait_timeout)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
|
||||
uint64_t deadline = 0;
|
||||
/// UINT64_MAX -- wait indefinitely
|
||||
if (wait_timeout && wait_timeout != UINT64_MAX)
|
||||
deadline = clock_gettime_ns() + wait_timeout * 1'000'000'000;
|
||||
|
||||
initialize();
|
||||
for (;;)
|
||||
{
|
||||
@ -153,6 +170,12 @@ Pool::Entry Pool::get()
|
||||
logger.trace("(%s): Unable to create a new connection: Max number of connections has been reached.", getDescription());
|
||||
}
|
||||
|
||||
if (!wait_timeout)
|
||||
throw Poco::Exception("mysqlxx::Pool is full (wait is disabled, see connection_wait_timeout setting)");
|
||||
|
||||
if (deadline && clock_gettime_ns() >= deadline)
|
||||
throw Poco::Exception("mysqlxx::Pool is full (connection_wait_timeout is exceeded)");
|
||||
|
||||
lock.unlock();
|
||||
logger.trace("(%s): Sleeping for %d seconds.", getDescription(), MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
sleepForSeconds(MYSQLXX_POOL_SLEEP_ON_CONNECT_FAIL);
|
||||
|
@ -189,7 +189,7 @@ public:
|
||||
~Pool();
|
||||
|
||||
/// Allocates connection.
|
||||
Entry get();
|
||||
Entry get(uint64_t wait_timeout);
|
||||
|
||||
/// Allocates connection.
|
||||
/// If database is not accessible, returns empty Entry object.
|
||||
|
@ -21,8 +21,9 @@ PoolWithFailover::PoolWithFailover(
|
||||
const unsigned max_connections_,
|
||||
const size_t max_tries_)
|
||||
: max_tries(max_tries_)
|
||||
, shareable(config_.getBool(config_name_ + ".share_connection", false))
|
||||
, wait_timeout(UINT64_MAX)
|
||||
{
|
||||
shareable = config_.getBool(config_name_ + ".share_connection", false);
|
||||
if (config_.has(config_name_ + ".replica"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys replica_keys;
|
||||
@ -80,9 +81,11 @@ PoolWithFailover::PoolWithFailover(
|
||||
const std::string & password,
|
||||
unsigned default_connections_,
|
||||
unsigned max_connections_,
|
||||
size_t max_tries_)
|
||||
size_t max_tries_,
|
||||
uint64_t wait_timeout_)
|
||||
: max_tries(max_tries_)
|
||||
, shareable(false)
|
||||
, wait_timeout(wait_timeout_)
|
||||
{
|
||||
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
|
||||
for (const auto & [host, port] : addresses)
|
||||
@ -101,6 +104,7 @@ PoolWithFailover::PoolWithFailover(
|
||||
PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
|
||||
: max_tries{other.max_tries}
|
||||
, shareable{other.shareable}
|
||||
, wait_timeout(other.wait_timeout)
|
||||
{
|
||||
if (shareable)
|
||||
{
|
||||
@ -140,7 +144,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = shareable ? pool->get() : pool->tryGet();
|
||||
Entry entry = shareable ? pool->get(wait_timeout) : pool->tryGet();
|
||||
|
||||
if (!entry.isNull())
|
||||
{
|
||||
@ -172,7 +176,7 @@ PoolWithFailover::Entry PoolWithFailover::get()
|
||||
if (full_pool)
|
||||
{
|
||||
app.logger().error("All connections failed, trying to wait on a full pool " + (*full_pool)->getDescription());
|
||||
return (*full_pool)->get();
|
||||
return (*full_pool)->get(wait_timeout);
|
||||
}
|
||||
|
||||
std::stringstream message;
|
||||
|
@ -80,6 +80,8 @@ namespace mysqlxx
|
||||
std::mutex mutex;
|
||||
/// Can the Pool be shared
|
||||
bool shareable;
|
||||
/// Timeout for waiting free connection.
|
||||
uint64_t wait_timeout = 0;
|
||||
|
||||
public:
|
||||
using Entry = Pool::Entry;
|
||||
@ -96,6 +98,7 @@ namespace mysqlxx
|
||||
* default_connections Number of connection in pool to each replica at start.
|
||||
* max_connections Maximum number of connections in pool to each replica.
|
||||
* max_tries_ Max number of connection tries.
|
||||
* wait_timeout_ Timeout for waiting free connection.
|
||||
*/
|
||||
PoolWithFailover(
|
||||
const std::string & config_name_,
|
||||
@ -117,7 +120,8 @@ namespace mysqlxx
|
||||
const std::string & password,
|
||||
unsigned default_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
unsigned max_connections_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_CONNECTIONS,
|
||||
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES);
|
||||
size_t max_tries_ = MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES,
|
||||
uint64_t wait_timeout_ = UINT64_MAX);
|
||||
|
||||
PoolWithFailover(const PoolWithFailover & other);
|
||||
|
||||
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit b687c17bc2be36b6333a1d7cfffbf9eab65509a9
|
||||
Subproject commit 46c80daf1b015aa10474ce82e3d24b578c6ae422
|
@ -31,6 +31,10 @@ ENGINE = MaterializedPostgreSQL('host:port', ['database' | database], 'user', 'p
|
||||
|
||||
- [materialized_postgresql_allow_automatic_update](../../operations/settings/settings.md#materialized-postgresql-allow-automatic-update)
|
||||
|
||||
- [materialized_postgresql_replication_slot](../../operations/settings/settings.md#materialized-postgresql-replication-slot)
|
||||
|
||||
- [materialized_postgresql_snapshot](../../operations/settings/settings.md#materialized-postgresql-snapshot)
|
||||
|
||||
``` sql
|
||||
CREATE DATABASE database1
|
||||
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres_user', 'postgres_password')
|
||||
@ -73,7 +77,7 @@ WHERE oid = 'postgres_table'::regclass;
|
||||
|
||||
!!! warning "Warning"
|
||||
Replication of [**TOAST**](https://www.postgresql.org/docs/9.5/storage-toast.html) values is not supported. The default value for the data type will be used.
|
||||
|
||||
|
||||
## Example of Use {#example-of-use}
|
||||
|
||||
``` sql
|
||||
@ -82,3 +86,11 @@ ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgres
|
||||
|
||||
SELECT * FROM postgresql_db.postgres_table;
|
||||
```
|
||||
|
||||
## Notes {#notes}
|
||||
|
||||
- Failover of the logical replication slot.
|
||||
|
||||
Logical Replication Slots which exist on the primary are not available on standby replicas.
|
||||
So if there is a failover, new primary (the old physical standby) won’t be aware of any slots which were existing with old primary. This will lead to a broken replication from PostgreSQL.
|
||||
A solution to this is to manage replication slots yourself and define a permanent replication slot (some information can be found [here](https://patroni.readthedocs.io/en/latest/SETTINGS.html)). You'll need to pass slot name via `materialized_postgresql_replication_slot` setting, and it has to be exported with `EXPORT SNAPSHOT` option. The snapshot identifier needs to be passed via `materialized_postgresql_snapshot` setting.
|
||||
|
@ -19,6 +19,7 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
|
||||
SETTINGS
|
||||
[connection_pool_size=16, ]
|
||||
[connection_max_tries=3, ]
|
||||
[connection_wait_timeout=5, ] /* 0 -- do not wait */
|
||||
[connection_auto_close=true ]
|
||||
;
|
||||
```
|
||||
|
@ -1253,7 +1253,7 @@ If this section is specified, the path from [users_config](../../operations/serv
|
||||
|
||||
The `user_directories` section can contain any number of items, the order of the items means their precedence (the higher the item the higher the precedence).
|
||||
|
||||
**Example**
|
||||
**Examples**
|
||||
|
||||
``` xml
|
||||
<user_directories>
|
||||
@ -1263,13 +1263,23 @@ The `user_directories` section can contain any number of items, the order of the
|
||||
<local_directory>
|
||||
<path>/var/lib/clickhouse/access/</path>
|
||||
</local_directory>
|
||||
</user_directories>
|
||||
```
|
||||
|
||||
Users, roles, row policies, quotas, and profiles can be also stored in ZooKeeper:
|
||||
|
||||
``` xml
|
||||
<user_directories>
|
||||
<users_xml>
|
||||
<path>/etc/clickhouse-server/users.xml</path>
|
||||
</users_xml>
|
||||
<replicated>
|
||||
<zookeeper_path>/clickhouse/access/</zookeeper_path>
|
||||
</replicated>
|
||||
</user_directories>
|
||||
```
|
||||
|
||||
You can also specify settings `memory` — means storing information only in memory, without writing to disk, and `ldap` — means storing information on an LDAP server.
|
||||
You can also define sections `memory` — means storing information only in memory, without writing to disk, and `ldap` — means storing information on an LDAP server.
|
||||
|
||||
To add an LDAP server as a remote user directory of users that are not defined locally, define a single `ldap` section with a following parameters:
|
||||
- `server` — one of LDAP server names defined in `ldap_servers` config section. This parameter is mandatory and cannot be empty.
|
||||
|
@ -3436,6 +3436,14 @@ Possible values:
|
||||
|
||||
Default value: `0`.
|
||||
|
||||
## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot}
|
||||
|
||||
Allows to have user-managed replication slots. Must be used together with `materialized_postgresql_snapshot`.
|
||||
|
||||
## materialized_postgresql_replication_slot {#materialized-postgresql-replication-slot}
|
||||
|
||||
A text string identifying a snapshot, from which initial dump of tables will be performed. Must be used together with `materialized_postgresql_replication_slot`.
|
||||
|
||||
## allow_experimental_projection_optimization {#allow-experimental-projection-optimization}
|
||||
|
||||
Enables or disables [projection](../../engines/table-engines/mergetree-family/mergetree.md#projections) optimization when processing `SELECT` queries.
|
||||
@ -3449,7 +3457,7 @@ Default value: `0`.
|
||||
|
||||
## force_optimize_projection {#force-optimize-projection}
|
||||
|
||||
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting).
|
||||
Enables or disables the obligatory use of [projections](../../engines/table-engines/mergetree-family/mergetree.md#projections) in `SELECT` queries, when projection optimization is enabled (see [allow_experimental_projection_optimization](#allow-experimental-projection-optimization) setting).
|
||||
|
||||
Possible values:
|
||||
|
||||
|
@ -1,44 +0,0 @@
|
||||
# system.views {#system-views}
|
||||
|
||||
Contains the dependencies of all views and the type to which the view belongs. The metadata of the view comes from the [system.tables](tables.md).
|
||||
|
||||
Columns:
|
||||
|
||||
- `database` ([String](../../sql-reference/data-types/string.md)) — The name of the database the view is in.
|
||||
|
||||
- `name` ([String](../../sql-reference/data-types/string.md)) — Name of the view.
|
||||
|
||||
- `main_dependency_database` ([String](../../sql-reference/data-types/string.md)) — The name of the database on which the view depends.
|
||||
|
||||
- `main_dependency_table` ([String](../../sql-reference/data-types/string.md)) - The name of the table on which the view depends.
|
||||
|
||||
- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — Type of the view. Values:
|
||||
- `'Default' = 1` — [Default views](../../sql-reference/statements/create/view.md#normal). Should not appear in this log.
|
||||
- `'Materialized' = 2` — [Materialized views](../../sql-reference/statements/create/view.md#materialized).
|
||||
- `'Live' = 3` — [Live views](../../sql-reference/statements/create/view.md#live-view).
|
||||
|
||||
**Example**
|
||||
|
||||
```sql
|
||||
SELECT * FROM system.views LIMIT 2 FORMAT Vertical;
|
||||
```
|
||||
|
||||
```text
|
||||
Row 1:
|
||||
──────
|
||||
database: default
|
||||
name: live_view
|
||||
main_dependency_database: default
|
||||
main_dependency_table: view_source_tb
|
||||
view_type: Live
|
||||
|
||||
Row 2:
|
||||
──────
|
||||
database: default
|
||||
name: materialized_view
|
||||
main_dependency_database: default
|
||||
main_dependency_table: view_source_tb
|
||||
view_type: Materialized
|
||||
```
|
||||
|
||||
[Original article](https://clickhouse.tech/docs/en/operations/system-tables/views) <!--hide-->
|
@ -1438,9 +1438,9 @@ Result:
|
||||
└───────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime {#snowflakeToDateTime}
|
||||
## snowflakeToDateTime {#snowflaketodatetime}
|
||||
|
||||
Extract time from snowflake id as DateTime format.
|
||||
Extracts time from [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) as [DateTime](../data-types/datetime.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
@ -1450,12 +1450,12 @@ snowflakeToDateTime(value [, time_zone])
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `value` — `snowflake id`, Int64 value.
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- value converted to the `DateTime` data type.
|
||||
- Input value converted to the [DateTime](../data-types/datetime.md) data type.
|
||||
|
||||
**Example**
|
||||
|
||||
@ -1474,9 +1474,9 @@ Result:
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime64 {#snowflakeToDateTime64}
|
||||
## snowflakeToDateTime64 {#snowflaketodatetime64}
|
||||
|
||||
Extract time from snowflake id as DateTime64 format.
|
||||
Extracts time from [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) as [DateTime64](../data-types/datetime64.md) format.
|
||||
|
||||
**Syntax**
|
||||
|
||||
@ -1486,12 +1486,12 @@ snowflakeToDateTime64(value [, time_zone])
|
||||
|
||||
**Parameters**
|
||||
|
||||
- `value` — `snowflake id`, Int64 value.
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [Timezone](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). The function parses `time_string` according to the timezone. Optional. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Returned value**
|
||||
|
||||
- value converted to the `DateTime64` data type.
|
||||
- Input value converted to the [DateTime64](../data-types/datetime64.md) data type.
|
||||
|
||||
**Example**
|
||||
|
||||
@ -1510,9 +1510,9 @@ Result:
|
||||
└────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTimeToSnowflake {#dateTimeToSnowflake}
|
||||
## dateTimeToSnowflake {#datetimetosnowflake}
|
||||
|
||||
Convert DateTime to the first snowflake id at the giving time.
|
||||
Converts [DateTime](../data-types/datetime.md) value to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
@ -1524,33 +1524,29 @@ dateTimeToSnowflake(value)
|
||||
|
||||
- `value` — Date and time. [DateTime](../../sql-reference/data-types/datetime.md).
|
||||
|
||||
|
||||
**Returned value**
|
||||
|
||||
- `value` converted to the `Int64` data type as the first snowflake id at that time.
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt
|
||||
SELECT dateTimeToSnowflake(dt);
|
||||
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
``` text
|
||||
|
||||
┌─dateTimeToSnowflake(dt)─┐
|
||||
│ 1426860702823350272 │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTime64ToSnowflake {#datetime64tosnowflake}
|
||||
|
||||
## dateTime64ToSnowflake {#dateTime64ToSnowflake}
|
||||
|
||||
Convert DateTime64 to the first snowflake id at the giving time.
|
||||
Convert [DateTime64](../data-types/datetime64.md) to the first [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) at the giving time.
|
||||
|
||||
**Syntax**
|
||||
|
||||
@ -1562,18 +1558,16 @@ dateTime64ToSnowflake(value)
|
||||
|
||||
- `value` — Date and time. [DateTime64](../../sql-reference/data-types/datetime64.md).
|
||||
|
||||
|
||||
**Returned value**
|
||||
|
||||
- `value` converted to the `Int64` data type as the first snowflake id at that time.
|
||||
- Input value converted to the [Int64](../data-types/int-uint.md) data type as the first Snowflake ID at that time.
|
||||
|
||||
**Example**
|
||||
|
||||
Query:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64
|
||||
SELECT dateTime64ToSnowflake(dt64);
|
||||
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
|
||||
```
|
||||
|
||||
Result:
|
||||
@ -1582,4 +1576,4 @@ Result:
|
||||
┌─dateTime64ToSnowflake(dt64)─┐
|
||||
│ 1426860704886947840 │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
```
|
||||
|
@ -1200,12 +1200,13 @@ ClickHouse использует ZooKeeper для хранения метадан
|
||||
Секция конфигурационного файла,которая содержит настройки:
|
||||
- Путь к конфигурационному файлу с предустановленными пользователями.
|
||||
- Путь к файлу, в котором содержатся пользователи, созданные при помощи SQL команд.
|
||||
- Путь к узлу ZooKeeper, где хранятся и реплицируются пользователи, созданные с помощью команд SQL (экспериментальная функциональность).
|
||||
|
||||
Если эта секция определена, путь из [users_config](../../operations/server-configuration-parameters/settings.md#users-config) и [access_control_path](../../operations/server-configuration-parameters/settings.md#access_control_path) не используется.
|
||||
|
||||
Секция `user_directories` может содержать любое количество элементов, порядок расположения элементов обозначает их приоритет (чем выше элемент, тем выше приоритет).
|
||||
|
||||
**Пример**
|
||||
**Примеры**
|
||||
|
||||
``` xml
|
||||
<user_directories>
|
||||
@ -1218,7 +1219,20 @@ ClickHouse использует ZooKeeper для хранения метадан
|
||||
</user_directories>
|
||||
```
|
||||
|
||||
Также вы можете указать настройку `memory` — означает хранение информации только в памяти, без записи на диск, и `ldap` — означает хранения информации на [LDAP-сервере](https://en.wikipedia.org/wiki/Lightweight_Directory_Access_Protocol).
|
||||
Пользователи, роли, политики доступа к строкам, квоты и профили могут храниться в ZooKeeper:
|
||||
|
||||
``` xml
|
||||
<user_directories>
|
||||
<users_xml>
|
||||
<path>/etc/clickhouse-server/users.xml</path>
|
||||
</users_xml>
|
||||
<replicated>
|
||||
<zookeeper_path>/clickhouse/access/</zookeeper_path>
|
||||
</replicated>
|
||||
</user_directories>
|
||||
```
|
||||
|
||||
Также вы можете добавить секции `memory` — означает хранение информации только в памяти, без записи на диск, и `ldap` — означает хранения информации на [LDAP-сервере](https://en.wikipedia.org/wiki/Lightweight_Directory_Access_Protocol).
|
||||
|
||||
Чтобы добавить LDAP-сервер в качестве удаленного каталога пользователей, которые не определены локально, определите один раздел `ldap` со следующими параметрами:
|
||||
- `server` — имя одного из LDAP-серверов, определенных в секции `ldap_servers` конфигурациионного файла. Этот параметр явялется необязательным и может быть пустым.
|
||||
|
@ -1436,3 +1436,144 @@ FROM numbers(3);
|
||||
│ 2,"good" │
|
||||
└───────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime {#snowflaketodatetime}
|
||||
|
||||
Извлекает время из [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) в формате [DateTime](../data-types/datetime.md).
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime(value [, time_zone])
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [временная зона сервера](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). Функция распознает `time_string` в соответствии с часовым поясом. Необязательный. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Значение, преобразованное в фомат [DateTime](../data-types/datetime.md).
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
|
||||
┌─snowflakeToDateTime(CAST('1426860702823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:57:56 │
|
||||
└──────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## snowflakeToDateTime64 {#snowflaketodatetime64}
|
||||
|
||||
Извлекает время из [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) в формате [DateTime64](../data-types/datetime64.md).
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
snowflakeToDateTime64(value [, time_zone])
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `value` — Snowflake ID. [Int64](../data-types/int-uint.md).
|
||||
- `time_zone` — [временная зона сервера](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone). Функция распознает `time_string` в соответствии с часовым поясом. Необязательный. [String](../../sql-reference/data-types/string.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Значение, преобразованное в фомат [DateTime64](../data-types/datetime64.md).
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
SELECT snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC');
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
|
||||
┌─snowflakeToDateTime64(CAST('1426860802823350272', 'Int64'), 'UTC')─┐
|
||||
│ 2021-08-15 10:58:19.841 │
|
||||
└────────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTimeToSnowflake {#datetimetosnowflake}
|
||||
|
||||
Преобразует значение [DateTime](../data-types/datetime.md) в первый идентификатор [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) на текущий момент.
|
||||
|
||||
**Syntax**
|
||||
|
||||
``` sql
|
||||
dateTimeToSnowflake(value)
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `value` — дата и время. [DateTime](../../sql-reference/data-types/datetime.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Значение, преобразованное в [Int64](../data-types/int-uint.md), как первый идентификатор Snowflake ID в момент выполнения.
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime('2021-08-15 18:57:56', 'Asia/Shanghai') AS dt SELECT dateTimeToSnowflake(dt);
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
┌─dateTimeToSnowflake(dt)─┐
|
||||
│ 1426860702823350272 │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
## dateTime64ToSnowflake {#datetime64tosnowflake}
|
||||
|
||||
Преобразует значение [DateTime64](../data-types/datetime64.md) в первый идентификатор [Snowflake ID](https://en.wikipedia.org/wiki/Snowflake_ID) на текущий момент.
|
||||
|
||||
**Синтаксис**
|
||||
|
||||
``` sql
|
||||
dateTime64ToSnowflake(value)
|
||||
```
|
||||
|
||||
**Аргументы**
|
||||
|
||||
- `value` — дата и время. [DateTime64](../data-types/datetime64.md).
|
||||
|
||||
**Возвращаемое значение**
|
||||
|
||||
- Значение, преобразованное в [Int64](../data-types/int-uint.md), как первый идентификатор Snowflake ID в момент выполнения.
|
||||
|
||||
|
||||
**Пример**
|
||||
|
||||
Запрос:
|
||||
|
||||
``` sql
|
||||
WITH toDateTime64('2021-08-15 18:57:56.492', 3, 'Asia/Shanghai') AS dt64 SELECT dateTime64ToSnowflake(dt64);
|
||||
```
|
||||
|
||||
Результат:
|
||||
|
||||
``` text
|
||||
┌─dateTime64ToSnowflake(dt64)─┐
|
||||
│ 1426860704886947840 │
|
||||
└─────────────────────────────┘
|
||||
```
|
||||
|
@ -247,6 +247,7 @@ CREATE TABLE codec_example
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
```
|
||||
|
||||
## Временные таблицы {#temporary-tables}
|
||||
|
||||
ClickHouse поддерживает временные таблицы со следующими характеристиками:
|
||||
|
@ -1131,6 +1131,10 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
|
||||
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
|
||||
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
|
||||
/// Firstly remove partially dropped databases, to avoid race with MaterializedMySQLSyncThread,
|
||||
/// that may execute DROP before loadMarkedAsDroppedTables() in background,
|
||||
/// and so loadMarkedAsDroppedTables() will find it and try to add, and UUID will overlap.
|
||||
database_catalog.loadMarkedAsDroppedTables();
|
||||
/// Then, load remaining databases
|
||||
loadMetadata(global_context, default_database);
|
||||
database_catalog.loadDatabases();
|
||||
|
@ -18,6 +18,7 @@
|
||||
- information
|
||||
- debug
|
||||
- trace
|
||||
- test (not for production usage)
|
||||
|
||||
[1]: https://github.com/pocoproject/poco/blob/poco-1.9.4-release/Foundation/include/Poco/Logger.h#L105-L114
|
||||
-->
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Poco/Logger.h>
|
||||
#include <Poco/AutoPtr.h>
|
||||
#include <Poco/NullChannel.h>
|
||||
#include <Poco/StreamChannel.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
TEST(Logger, Log)
|
||||
@ -17,3 +19,34 @@ TEST(Logger, Log)
|
||||
/// This test checks that we don't pass this string to fmtlib, because it is the only argument.
|
||||
EXPECT_NO_THROW(LOG_INFO(log, "Hello {} World"));
|
||||
}
|
||||
|
||||
TEST(Logger, TestLog)
|
||||
{
|
||||
{ /// Test logs visible for test level
|
||||
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
auto my_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(oss));
|
||||
auto * log = &Poco::Logger::create("TestLogger", my_channel.get());
|
||||
log->setLevel("test");
|
||||
LOG_TEST(log, "Hello World");
|
||||
|
||||
EXPECT_EQ(oss.str(), "Hello World\n");
|
||||
Poco::Logger::destroy("TestLogger");
|
||||
}
|
||||
|
||||
{ /// Test logs invisible for other levels
|
||||
for (const auto & level : {"trace", "debug", "information", "warning", "error", "fatal"})
|
||||
{
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
auto my_channel = Poco::AutoPtr<Poco::StreamChannel>(new Poco::StreamChannel(oss));
|
||||
auto * log = &Poco::Logger::create(std::string{level} + "_Logger", my_channel.get());
|
||||
log->setLevel(level);
|
||||
LOG_TEST(log, "Hello World");
|
||||
|
||||
EXPECT_EQ(oss.str(), "");
|
||||
|
||||
Poco::Logger::destroy(std::string{level} + "_Logger");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -122,6 +122,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TEST(log, "Commit request for session {} with type {}, log id {}{}",
|
||||
request_for_session.session_id, toString(request_for_session.request->getOpNum()), log_idx,
|
||||
request_for_session.request->getPath().empty() ? "" : ", path " + request_for_session.request->getPath());
|
||||
|
||||
std::lock_guard lock(storage_and_responses_lock);
|
||||
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
|
||||
for (auto & response_for_session : responses_for_sessions)
|
||||
|
@ -79,8 +79,8 @@ IMPLEMENT_SETTING_ENUM(LogsLevel, ErrorCodes::BAD_ARGUMENTS,
|
||||
{"warning", LogsLevel::warning},
|
||||
{"information", LogsLevel::information},
|
||||
{"debug", LogsLevel::debug},
|
||||
{"trace", LogsLevel::trace}})
|
||||
|
||||
{"trace", LogsLevel::trace},
|
||||
{"test", LogsLevel::test}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM_WITH_RENAME(LogQueriesType, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"QUERY_START", QUERY_START},
|
||||
|
@ -94,6 +94,7 @@ enum class LogsLevel
|
||||
information,
|
||||
debug,
|
||||
trace,
|
||||
test,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(LogsLevel)
|
||||
|
@ -247,7 +247,7 @@ void MaterializedMySQLSyncThread::assertMySQLAvailable()
|
||||
{
|
||||
try
|
||||
{
|
||||
checkMySQLVariables(pool.get(), getContext()->getSettingsRef());
|
||||
checkMySQLVariables(pool.get(/* wait_timeout= */ UINT64_MAX), getContext()->getSettingsRef());
|
||||
}
|
||||
catch (const mysqlxx::ConnectionFailed & e)
|
||||
{
|
||||
@ -729,7 +729,7 @@ void MaterializedMySQLSyncThread::onEvent(Buffers & buffers, const BinlogEventPt
|
||||
{
|
||||
/// Some behaviors(such as changing the value of "binlog_checksum") rotate the binlog file.
|
||||
/// To ensure that the synchronization continues, we need to handle these events
|
||||
metadata.fetchMasterVariablesValue(pool.get());
|
||||
metadata.fetchMasterVariablesValue(pool.get(/* wait_timeout= */ UINT64_MAX));
|
||||
client.setBinlogChecksum(metadata.binlog_checksum);
|
||||
}
|
||||
else if (receive_event->header.type != HEARTBEAT_EVENT)
|
||||
|
@ -61,10 +61,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
|
||||
connection_info,
|
||||
getContext(),
|
||||
is_attach,
|
||||
settings->materialized_postgresql_max_block_size.value,
|
||||
settings->materialized_postgresql_allow_automatic_update,
|
||||
/* is_materialized_postgresql_database = */ true,
|
||||
settings->materialized_postgresql_tables_list.value);
|
||||
*settings,
|
||||
/* is_materialized_postgresql_database = */ true);
|
||||
|
||||
postgres::Connection connection(connection_info);
|
||||
NameSet tables_to_replicate;
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
return name;
|
||||
}
|
||||
|
||||
size_t getNumberOfArguments() const override { return 4; }
|
||||
size_t getNumberOfArguments() const override { return 3; }
|
||||
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
|
||||
|
@ -41,7 +41,7 @@ public:
|
||||
return name;
|
||||
}
|
||||
|
||||
size_t getNumberOfArguments() const override { return 4; }
|
||||
size_t getNumberOfArguments() const override { return 3; }
|
||||
|
||||
bool useDefaultImplementationForConstants() const override { return true; }
|
||||
|
||||
|
@ -28,41 +28,49 @@ ZstdInflatingReadBuffer::~ZstdInflatingReadBuffer()
|
||||
|
||||
bool ZstdInflatingReadBuffer::nextImpl()
|
||||
{
|
||||
if (eof)
|
||||
return false;
|
||||
|
||||
if (input.pos >= input.size)
|
||||
do
|
||||
{
|
||||
in->nextIfAtEnd();
|
||||
input.src = reinterpret_cast<unsigned char *>(in->position());
|
||||
input.pos = 0;
|
||||
input.size = in->buffer().end() - in->position();
|
||||
}
|
||||
// If it is known that end of file was reached, return false
|
||||
if (eof)
|
||||
return false;
|
||||
|
||||
output.dst = reinterpret_cast<unsigned char *>(internal_buffer.begin());
|
||||
output.size = internal_buffer.size();
|
||||
output.pos = 0;
|
||||
/// If end was reached, get next part
|
||||
if (input.pos >= input.size)
|
||||
{
|
||||
in->nextIfAtEnd();
|
||||
input.src = reinterpret_cast<unsigned char *>(in->position());
|
||||
input.pos = 0;
|
||||
input.size = in->buffer().end() - in->position();
|
||||
}
|
||||
|
||||
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
|
||||
if (ZSTD_isError(ret))
|
||||
throw Exception(
|
||||
ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
|
||||
/// fill output
|
||||
output.dst = reinterpret_cast<unsigned char *>(internal_buffer.begin());
|
||||
output.size = internal_buffer.size();
|
||||
output.pos = 0;
|
||||
|
||||
in->position() = in->buffer().begin() + input.pos;
|
||||
working_buffer.resize(output.pos);
|
||||
/// Decompress data and check errors.
|
||||
size_t ret = ZSTD_decompressStream(dctx, &output, &input);
|
||||
if (ZSTD_isError(ret))
|
||||
throw Exception(
|
||||
ErrorCodes::ZSTD_DECODER_FAILED, "Zstd stream decoding failed: error code: {}; zstd version: {}", ret, ZSTD_VERSION_STRING);
|
||||
|
||||
if (in->eof())
|
||||
{
|
||||
eof = true;
|
||||
return !working_buffer.empty();
|
||||
}
|
||||
else if (output.pos == 0)
|
||||
{
|
||||
/// Check that something has changed after decompress (input or output position)
|
||||
assert(output.pos > 0 || in->position() < in->buffer().begin() + input.pos);
|
||||
|
||||
/// move position to the end of read data
|
||||
in->position() = in->buffer().begin() + input.pos;
|
||||
working_buffer.resize(output.pos);
|
||||
|
||||
/// If end of file is reached, fill eof variable and return true if there is some data in buffer, otherwise return false
|
||||
if (in->eof())
|
||||
{
|
||||
eof = true;
|
||||
return !working_buffer.empty();
|
||||
}
|
||||
/// It is possible, that input buffer is not at eof yet, but nothing was decompressed in current iteration.
|
||||
/// But there are cases, when such behaviour is not allowed - i.e. if input buffer is not eof, then
|
||||
/// it has to be guaranteed that working_buffer is not empty. So if it is empty, continue.
|
||||
return nextImpl();
|
||||
}
|
||||
} while (output.pos == 0);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ LIBRARY()
|
||||
|
||||
ADDINCL(
|
||||
contrib/libs/zstd/include
|
||||
contrib/libs/lz4
|
||||
contrib/restricted/fast_float/include
|
||||
)
|
||||
|
||||
@ -14,10 +15,10 @@ PEERDIR(
|
||||
contrib/libs/brotli/enc
|
||||
contrib/libs/poco/NetSSL_OpenSSL
|
||||
contrib/libs/zstd
|
||||
contrib/libs/lz4
|
||||
contrib/restricted/fast_float
|
||||
)
|
||||
|
||||
|
||||
SRCS(
|
||||
AIO.cpp
|
||||
AsynchronousReadBufferFromFile.cpp
|
||||
|
@ -4,6 +4,7 @@ LIBRARY()
|
||||
|
||||
ADDINCL(
|
||||
contrib/libs/zstd/include
|
||||
contrib/libs/lz4
|
||||
contrib/restricted/fast_float/include
|
||||
)
|
||||
|
||||
@ -13,10 +14,10 @@ PEERDIR(
|
||||
contrib/libs/brotli/enc
|
||||
contrib/libs/poco/NetSSL_OpenSSL
|
||||
contrib/libs/zstd
|
||||
contrib/libs/lz4
|
||||
contrib/restricted/fast_float
|
||||
)
|
||||
|
||||
|
||||
SRCS(
|
||||
<? find . -name '*.cpp' | grep -v -F tests | grep -v -F examples | grep -v -F fuzzers | grep -v -P 'S3|HDFS' | sed 's/^\.\// /' | sort ?>
|
||||
)
|
||||
|
@ -146,7 +146,6 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
|
||||
|
||||
void DatabaseCatalog::loadDatabases()
|
||||
{
|
||||
loadMarkedAsDroppedTables();
|
||||
auto task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
|
||||
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
|
||||
(*drop_task)->activate();
|
||||
@ -618,12 +617,6 @@ Dependencies DatabaseCatalog::getDependencies(const StorageID & from) const
|
||||
return Dependencies(iter->second.begin(), iter->second.end());
|
||||
}
|
||||
|
||||
ViewDependencies DatabaseCatalog::getViewDependencies() const
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
return ViewDependencies(view_dependencies.begin(), view_dependencies.end());
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID & old_where, const StorageID & new_from,
|
||||
const StorageID & new_where)
|
||||
|
@ -130,6 +130,7 @@ public:
|
||||
|
||||
void initializeAndLoadTemporaryDatabase();
|
||||
void loadDatabases();
|
||||
void loadMarkedAsDroppedTables();
|
||||
|
||||
/// Get an object that protects the table from concurrently executing multiple DDL operations.
|
||||
DDLGuardPtr getDDLGuard(const String & database, const String & table);
|
||||
@ -174,7 +175,6 @@ public:
|
||||
void addDependency(const StorageID & from, const StorageID & where);
|
||||
void removeDependency(const StorageID & from, const StorageID & where);
|
||||
Dependencies getDependencies(const StorageID & from) const;
|
||||
ViewDependencies getViewDependencies() const;
|
||||
|
||||
/// For Materialized and Live View
|
||||
void updateDependency(const StorageID & old_from, const StorageID & old_where,const StorageID & new_from, const StorageID & new_where);
|
||||
@ -241,7 +241,6 @@ private:
|
||||
};
|
||||
using TablesMarkedAsDropped = std::list<TableMarkedAsDropped>;
|
||||
|
||||
void loadMarkedAsDroppedTables();
|
||||
void dropTableDataTask();
|
||||
void dropTableFinally(const TableMarkedAsDropped & table);
|
||||
|
||||
|
@ -402,8 +402,8 @@ void Set::checkTypesEqual(size_t set_type_idx, const DataTypePtr & other_type) c
|
||||
+ data_types[set_type_idx]->getName() + " on the right", ErrorCodes::TYPE_MISMATCH);
|
||||
}
|
||||
|
||||
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_)
|
||||
: indexes_mapping(std::move(index_mapping_))
|
||||
MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_)
|
||||
: has_all_keys(set_elements.size() == indexes_mapping_.size()), indexes_mapping(std::move(indexes_mapping_))
|
||||
{
|
||||
std::sort(indexes_mapping.begin(), indexes_mapping.end(),
|
||||
[](const KeyTuplePositionMapping & l, const KeyTuplePositionMapping & r)
|
||||
@ -548,11 +548,11 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (one_element_range)
|
||||
if (one_element_range && has_all_keys)
|
||||
{
|
||||
/// Here we know that there is one element in range.
|
||||
/// The main difference with the normal case is that we can definitely say that
|
||||
/// condition in this range always TRUE (can_be_false = 0) xor always FALSE (can_be_true = 0).
|
||||
/// condition in this range is always TRUE (can_be_false = 0) or always FALSE (can_be_true = 0).
|
||||
|
||||
/// Check if it's an empty range
|
||||
if (!left_included || !right_included)
|
||||
|
@ -208,7 +208,7 @@ public:
|
||||
std::vector<FunctionBasePtr> functions;
|
||||
};
|
||||
|
||||
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && index_mapping_);
|
||||
MergeTreeSetIndex(const Columns & set_elements, std::vector<KeyTuplePositionMapping> && indexes_mapping_);
|
||||
|
||||
size_t size() const { return ordered_set.at(0)->size(); }
|
||||
|
||||
@ -217,6 +217,8 @@ public:
|
||||
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const;
|
||||
|
||||
private:
|
||||
// If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element.
|
||||
bool has_all_keys;
|
||||
Columns ordered_set;
|
||||
std::vector<KeyTuplePositionMapping> indexes_mapping;
|
||||
|
||||
|
@ -26,7 +26,8 @@ NamesAndTypesList TextLogElement::getNamesAndTypes()
|
||||
{"Notice", static_cast<Int8>(Message::PRIO_NOTICE)},
|
||||
{"Information", static_cast<Int8>(Message::PRIO_INFORMATION)},
|
||||
{"Debug", static_cast<Int8>(Message::PRIO_DEBUG)},
|
||||
{"Trace", static_cast<Int8>(Message::PRIO_TRACE)}
|
||||
{"Trace", static_cast<Int8>(Message::PRIO_TRACE)},
|
||||
{"Test", static_cast<Int8>(Message::PRIO_TEST)},
|
||||
});
|
||||
|
||||
return
|
||||
|
@ -174,7 +174,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
|
||||
if (query_info.projection->desc->is_minmax_count_projection)
|
||||
{
|
||||
Pipe pipe(std::make_shared<SourceFromSingleChunk>(
|
||||
query_info.minmax_count_projection_block,
|
||||
query_info.minmax_count_projection_block.cloneEmpty(),
|
||||
Chunk(query_info.minmax_count_projection_block.getColumns(), query_info.minmax_count_projection_block.rows())));
|
||||
auto read_from_pipe = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
|
||||
projection_plan->addStep(std::move(read_from_pipe));
|
||||
|
@ -17,6 +17,7 @@ class ASTStorage;
|
||||
#define LIST_OF_MYSQL_SETTINGS(M) \
|
||||
M(UInt64, connection_pool_size, 16, "Size of connection pool (if all connections are in use, the query will wait until some connection will be freed).", 0) \
|
||||
M(UInt64, connection_max_tries, 3, "Number of retries for pool with failover", 0) \
|
||||
M(UInt64, connection_wait_timeout, 5, "Timeout (in seconds) for waiting for free connection (in case of there is already connection_pool_size active connections), 0 - do not wait.", 0) \
|
||||
M(Bool, connection_auto_close, true, "Auto-close connection after query execution, i.e. disable connection reuse.", 0) \
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(MySQLSettingsTraits, LIST_OF_MYSQL_SETTINGS)
|
||||
|
@ -625,9 +625,8 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
return false;
|
||||
}
|
||||
catch (const pqxx::broken_connection & e)
|
||||
catch (const pqxx::broken_connection &)
|
||||
{
|
||||
LOG_ERROR(log, "Connection error: {}", e.what());
|
||||
connection->tryUpdateConnection();
|
||||
return false;
|
||||
}
|
||||
@ -641,6 +640,7 @@ bool MaterializedPostgreSQLConsumer::readFromReplicationSlot()
|
||||
if (error_message.find("out of relcache_callback_list slots") == std::string::npos)
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
connection->tryUpdateConnection();
|
||||
return false;
|
||||
}
|
||||
catch (const pqxx::conversion_error & e)
|
||||
|
@ -17,6 +17,8 @@ namespace DB
|
||||
M(UInt64, materialized_postgresql_max_block_size, 65536, "Number of row collected before flushing data into table.", 0) \
|
||||
M(String, materialized_postgresql_tables_list, "", "List of tables for MaterializedPostgreSQL database engine", 0) \
|
||||
M(Bool, materialized_postgresql_allow_automatic_update, false, "Allow to reload table in the background, when schema changes are detected", 0) \
|
||||
M(String, materialized_postgresql_replication_slot, "", "A user-created replication slot", 0) \
|
||||
M(String, materialized_postgresql_snapshot, "", "User provided snapshot in case he manages replication slots himself", 0) \
|
||||
|
||||
DECLARE_SETTINGS_TRAITS(MaterializedPostgreSQLSettingsTraits, LIST_OF_MATERIALIZED_POSTGRESQL_SETTINGS)
|
||||
|
||||
|
@ -32,24 +32,28 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
ContextPtr context_,
|
||||
bool is_attach_,
|
||||
const size_t max_block_size_,
|
||||
bool allow_automatic_update_,
|
||||
bool is_materialized_postgresql_database_,
|
||||
const String tables_list_)
|
||||
const MaterializedPostgreSQLSettings & replication_settings,
|
||||
bool is_materialized_postgresql_database_)
|
||||
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
|
||||
, context(context_)
|
||||
, is_attach(is_attach_)
|
||||
, remote_database_name(remote_database_name_)
|
||||
, current_database_name(current_database_name_)
|
||||
, connection_info(connection_info_)
|
||||
, max_block_size(max_block_size_)
|
||||
, allow_automatic_update(allow_automatic_update_)
|
||||
, max_block_size(replication_settings.materialized_postgresql_max_block_size)
|
||||
, allow_automatic_update(replication_settings.materialized_postgresql_allow_automatic_update)
|
||||
, is_materialized_postgresql_database(is_materialized_postgresql_database_)
|
||||
, tables_list(tables_list_)
|
||||
, tables_list(replication_settings.materialized_postgresql_tables_list)
|
||||
, user_provided_snapshot(replication_settings.materialized_postgresql_snapshot)
|
||||
, connection(std::make_shared<postgres::Connection>(connection_info_))
|
||||
, milliseconds_to_wait(RESCHEDULE_MS)
|
||||
{
|
||||
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
|
||||
replication_slot = replication_settings.materialized_postgresql_replication_slot;
|
||||
if (replication_slot.empty())
|
||||
{
|
||||
user_managed_slot = false;
|
||||
replication_slot = fmt::format("{}_ch_replication_slot", replication_identifier);
|
||||
}
|
||||
publication_name = fmt::format("{}_ch_publication", replication_identifier);
|
||||
|
||||
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ waitConnectionAndStart(); });
|
||||
@ -121,7 +125,20 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
|
||||
|
||||
auto initial_sync = [&]()
|
||||
{
|
||||
createReplicationSlot(tx, start_lsn, snapshot_name);
|
||||
LOG_TRACE(log, "Starting tables sync load");
|
||||
|
||||
if (user_managed_slot)
|
||||
{
|
||||
if (user_provided_snapshot.empty())
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Using a user-defined replication slot must be provided with a snapshot from EXPORT SNAPSHOT when the slot is created."
|
||||
"Pass it to `materialized_postgresql_snapshot` setting");
|
||||
snapshot_name = user_provided_snapshot;
|
||||
}
|
||||
else
|
||||
{
|
||||
createReplicationSlot(tx, start_lsn, snapshot_name);
|
||||
}
|
||||
|
||||
for (const auto & [table_name, storage] : materialized_storages)
|
||||
{
|
||||
@ -147,12 +164,17 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
|
||||
/// Recreation of a replication slot imposes reloading of all tables.
|
||||
if (!isReplicationSlotExist(tx, start_lsn, /* temporary */false))
|
||||
{
|
||||
if (user_managed_slot)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Having replication slot `{}` from settings, but it does not exist", replication_slot);
|
||||
|
||||
initial_sync();
|
||||
}
|
||||
/// Always drop replication slot if it is CREATE query and not ATTACH.
|
||||
else if (!is_attach || new_publication)
|
||||
{
|
||||
dropReplicationSlot(tx);
|
||||
if (!user_managed_slot)
|
||||
dropReplicationSlot(tx);
|
||||
|
||||
initial_sync();
|
||||
}
|
||||
/// Synchronization and initial load already took place - do not create any new tables, just fetch StoragePtr's
|
||||
@ -376,6 +398,8 @@ bool PostgreSQLReplicationHandler::isReplicationSlotExist(pqxx::nontransaction &
|
||||
void PostgreSQLReplicationHandler::createReplicationSlot(
|
||||
pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary)
|
||||
{
|
||||
assert(temporary || !user_managed_slot);
|
||||
|
||||
String query_str, slot_name;
|
||||
if (temporary)
|
||||
slot_name = replication_slot + "_tmp";
|
||||
@ -401,6 +425,8 @@ void PostgreSQLReplicationHandler::createReplicationSlot(
|
||||
|
||||
void PostgreSQLReplicationHandler::dropReplicationSlot(pqxx::nontransaction & tx, bool temporary)
|
||||
{
|
||||
assert(temporary || !user_managed_slot);
|
||||
|
||||
std::string slot_name;
|
||||
if (temporary)
|
||||
slot_name = replication_slot + "_tmp";
|
||||
@ -433,14 +459,17 @@ void PostgreSQLReplicationHandler::shutdownFinal()
|
||||
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx)
|
||||
{
|
||||
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
|
||||
dropReplicationSlot(tx, /* temporary */false);
|
||||
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
|
||||
dropReplicationSlot(tx, /* temporary */true);
|
||||
});
|
||||
|
||||
if (user_managed_slot)
|
||||
return;
|
||||
|
||||
connection->execWithRetry([&](pqxx::nontransaction & tx)
|
||||
{
|
||||
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */true))
|
||||
dropReplicationSlot(tx, /* temporary */true);
|
||||
if (isReplicationSlotExist(tx, last_committed_lsn, /* temporary */false))
|
||||
dropReplicationSlot(tx, /* temporary */false);
|
||||
});
|
||||
}
|
||||
catch (Exception & e)
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "MaterializedPostgreSQLConsumer.h"
|
||||
#include "MaterializedPostgreSQLSettings.h"
|
||||
#include <Databases/PostgreSQL/fetchPostgreSQLTableStructure.h>
|
||||
#include <Core/PostgreSQL/Utils.h>
|
||||
|
||||
@ -25,10 +26,8 @@ public:
|
||||
const postgres::ConnectionInfo & connection_info_,
|
||||
ContextPtr context_,
|
||||
bool is_attach_,
|
||||
const size_t max_block_size_,
|
||||
bool allow_automatic_update_,
|
||||
bool is_materialized_postgresql_database_,
|
||||
const String tables_list = "");
|
||||
const MaterializedPostgreSQLSettings & replication_settings,
|
||||
bool is_materialized_postgresql_database_);
|
||||
|
||||
/// Activate task to be run from a separate thread: wait until connection is available and call startReplication().
|
||||
void startup();
|
||||
@ -108,6 +107,9 @@ private:
|
||||
/// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated.
|
||||
String tables_list;
|
||||
|
||||
bool user_managed_slot = true;
|
||||
String user_provided_snapshot;
|
||||
|
||||
String replication_slot, publication_name;
|
||||
|
||||
/// Shared between replication_consumer and replication_handler, but never accessed concurrently.
|
||||
|
@ -64,6 +64,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
String replication_identifier = remote_database_name + "_" + remote_table_name_;
|
||||
replication_settings->materialized_postgresql_tables_list = remote_table_name_;
|
||||
|
||||
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
|
||||
replication_identifier,
|
||||
remote_database_name,
|
||||
@ -71,8 +73,8 @@ StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
|
||||
connection_info,
|
||||
getContext(),
|
||||
is_attach,
|
||||
replication_settings->materialized_postgresql_max_block_size.value,
|
||||
/* allow_automatic_update */ false, /* is_materialized_postgresql_database */false);
|
||||
*replication_settings,
|
||||
/* is_materialized_postgresql_database */false);
|
||||
|
||||
if (!is_attach)
|
||||
{
|
||||
|
@ -267,11 +267,15 @@ void registerStorageMySQL(StorageFactory & factory)
|
||||
throw Exception("connection_pool_size cannot be zero.", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
auto addresses = parseRemoteDescriptionForExternalDatabase(host_port, max_addresses, 3306);
|
||||
mysqlxx::PoolWithFailover pool(remote_database, addresses,
|
||||
username, password,
|
||||
mysqlxx::PoolWithFailover pool(
|
||||
remote_database,
|
||||
addresses,
|
||||
username,
|
||||
password,
|
||||
MYSQLXX_POOL_WITH_FAILOVER_DEFAULT_START_CONNECTIONS,
|
||||
mysql_settings.connection_pool_size,
|
||||
mysql_settings.connection_max_tries);
|
||||
mysql_settings.connection_max_tries,
|
||||
mysql_settings.connection_wait_timeout);
|
||||
|
||||
bool replace_query = false;
|
||||
std::string on_duplicate_clause;
|
||||
|
@ -1,68 +0,0 @@
|
||||
#include <Storages/System/StorageSystemViews.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/QueryViewsLog.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <Storages/StorageMaterializedView.h>
|
||||
#include <Storages/LiveView/StorageLiveView.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
NamesAndTypesList StorageSystemViews::getNamesAndTypes()
|
||||
{
|
||||
auto view_type_datatype = std::make_shared<DataTypeEnum8>(DataTypeEnum8::Values{
|
||||
{"Default", static_cast<Int8>(QueryViewsLogElement::ViewType::DEFAULT)},
|
||||
{"Materialized", static_cast<Int8>(QueryViewsLogElement::ViewType::MATERIALIZED)},
|
||||
{"Live", static_cast<Int8>(QueryViewsLogElement::ViewType::LIVE)}});
|
||||
|
||||
return {
|
||||
{"database", std::make_shared<DataTypeString>()},
|
||||
{"name", std::make_shared<DataTypeString>()},
|
||||
{"main_dependency_database", std::make_shared<DataTypeString>()},
|
||||
{"main_dependency_table", std::make_shared<DataTypeString>()},
|
||||
{"view_type", std::move(view_type_datatype)},
|
||||
};
|
||||
}
|
||||
|
||||
void StorageSystemViews::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
|
||||
|
||||
for (const auto & [table_id, view_ids] : DatabaseCatalog::instance().getViewDependencies())
|
||||
{
|
||||
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name);
|
||||
|
||||
if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, table_id.database_name, table_id.table_name))
|
||||
continue;
|
||||
|
||||
size_t col_num;
|
||||
for (const auto & view_id : view_ids)
|
||||
{
|
||||
auto view_ptr = DatabaseCatalog::instance().getTable(view_id, context);
|
||||
QueryViewsLogElement::ViewType type = QueryViewsLogElement::ViewType::DEFAULT;
|
||||
|
||||
if (typeid_cast<const StorageMaterializedView *>(view_ptr.get()))
|
||||
{
|
||||
type = QueryViewsLogElement::ViewType::MATERIALIZED;
|
||||
}
|
||||
else if (typeid_cast<const StorageLiveView *>(view_ptr.get()))
|
||||
{
|
||||
type = QueryViewsLogElement::ViewType::LIVE;
|
||||
}
|
||||
|
||||
col_num = 0;
|
||||
res_columns[col_num++]->insert(view_id.database_name);
|
||||
res_columns[col_num++]->insert(view_id.table_name);
|
||||
res_columns[col_num++]->insert(table_id.database_name);
|
||||
res_columns[col_num++]->insert(table_id.table_name);
|
||||
res_columns[col_num++]->insert(type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <common/shared_ptr_helper.h>
|
||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageSystemViews final : public shared_ptr_helper<StorageSystemViews>, public IStorageSystemOneBlock<StorageSystemViews>
|
||||
{
|
||||
friend struct shared_ptr_helper<StorageSystemViews>;
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
|
||||
|
||||
public:
|
||||
std::string getName() const override { return "SystemViews"; }
|
||||
|
||||
static NamesAndTypesList getNamesAndTypes();
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -44,7 +44,6 @@
|
||||
#include <Storages/System/StorageSystemTableEngines.h>
|
||||
#include <Storages/System/StorageSystemTableFunctions.h>
|
||||
#include <Storages/System/StorageSystemTables.h>
|
||||
#include <Storages/System/StorageSystemViews.h>
|
||||
#include <Storages/System/StorageSystemZooKeeper.h>
|
||||
#include <Storages/System/StorageSystemContributors.h>
|
||||
#include <Storages/System/StorageSystemErrors.h>
|
||||
@ -96,7 +95,6 @@ void attachSystemTablesLocal(IDatabase & system_database)
|
||||
attach<StorageSystemZeros>(system_database, "zeros_mt", true);
|
||||
attach<StorageSystemDatabases>(system_database, "databases");
|
||||
attach<StorageSystemTables>(system_database, "tables");
|
||||
attach<StorageSystemViews>(system_database, "views");
|
||||
attach<StorageSystemColumns>(system_database, "columns");
|
||||
attach<StorageSystemFunctions>(system_database, "functions");
|
||||
attach<StorageSystemEvents>(system_database, "events");
|
||||
|
@ -18,6 +18,7 @@ SRCS(
|
||||
Distributed/DirectoryMonitor.cpp
|
||||
Distributed/DistributedSettings.cpp
|
||||
Distributed/DistributedSink.cpp
|
||||
ExecutablePoolSettings.cpp
|
||||
IStorage.cpp
|
||||
IndicesDescription.cpp
|
||||
JoinSettings.cpp
|
||||
|
5
tests/config/config.d/logger.xml
Normal file
5
tests/config/config.d/logger.xml
Normal file
@ -0,0 +1,5 @@
|
||||
<yandex>
|
||||
<logger>
|
||||
<level>test</level>
|
||||
</logger>
|
||||
</yandex>
|
@ -38,6 +38,7 @@ ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d
|
||||
ln -sf $SRC_PATH/config.d/top_level_domains_path.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/encryption.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/zookeeper_log.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/logger.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/users.d/log_queries.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/readonly.xml $DEST_SERVER_PATH/users.d/
|
||||
ln -sf $SRC_PATH/users.d/access_management.xml $DEST_SERVER_PATH/users.d/
|
||||
|
@ -0,0 +1 @@
|
||||
#!/usr/bin/env python3
|
@ -0,0 +1,33 @@
|
||||
<yandex>
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>1</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||
<session_timeout_ms>10000</session_timeout_ms>
|
||||
<snapshot_distance>75</snapshot_distance>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>node1</hostname>
|
||||
<port>44444</port>
|
||||
<can_become_leader>true</can_become_leader>
|
||||
<priority>3</priority>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>node2</hostname>
|
||||
<port>44444</port>
|
||||
<can_become_leader>true</can_become_leader>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
<priority>2</priority>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
</yandex>
|
@ -0,0 +1,33 @@
|
||||
<yandex>
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>2</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||
<session_timeout_ms>10000</session_timeout_ms>
|
||||
<snapshot_distance>75</snapshot_distance>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>node1</hostname>
|
||||
<port>44444</port>
|
||||
<can_become_leader>true</can_become_leader>
|
||||
<priority>3</priority>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>node2</hostname>
|
||||
<port>44444</port>
|
||||
<can_become_leader>true</can_become_leader>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
<priority>2</priority>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
</yandex>
|
@ -0,0 +1,12 @@
|
||||
<yandex>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>node1</host>
|
||||
<port>9181</port>
|
||||
</node>
|
||||
<node index="2">
|
||||
<host>node2</host>
|
||||
<port>9181</port>
|
||||
</node>
|
||||
</zookeeper>
|
||||
</yandex>
|
163
tests/integration/test_keeper_two_nodes_cluster/test.py
Normal file
163
tests/integration/test_keeper_two_nodes_cluster/test.py
Normal file
@ -0,0 +1,163 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
import random
|
||||
import string
|
||||
import os
|
||||
import time
|
||||
from multiprocessing.dummy import Pool
|
||||
from helpers.network import PartitionManager
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True)
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True)
|
||||
|
||||
from kazoo.client import KazooClient, KazooState
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
def smaller_exception(ex):
|
||||
return '\n'.join(str(ex).split('\n')[0:2])
|
||||
|
||||
def wait_node(node):
|
||||
for _ in range(100):
|
||||
zk = None
|
||||
try:
|
||||
node.query("SELECT * FROM system.zookeeper WHERE path = '/'")
|
||||
zk = get_fake_zk(node.name, timeout=30.0)
|
||||
zk.create("/test", sequence=True)
|
||||
print("node", node.name, "ready")
|
||||
break
|
||||
except Exception as ex:
|
||||
time.sleep(0.2)
|
||||
print("Waiting until", node.name, "will be ready, exception", ex)
|
||||
finally:
|
||||
if zk:
|
||||
zk.stop()
|
||||
zk.close()
|
||||
else:
|
||||
raise Exception("Can't wait node", node.name, "to become ready")
|
||||
|
||||
def wait_nodes():
|
||||
for node in [node1, node2]:
|
||||
wait_node(node)
|
||||
|
||||
|
||||
def get_fake_zk(nodename, timeout=30.0):
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
|
||||
_fake_zk_instance.start()
|
||||
return _fake_zk_instance
|
||||
|
||||
def test_read_write_two_nodes(started_cluster):
|
||||
try:
|
||||
wait_nodes()
|
||||
node1_zk = get_fake_zk("node1")
|
||||
node2_zk = get_fake_zk("node2")
|
||||
|
||||
node1_zk.create("/test_read_write_multinode_node1", b"somedata1")
|
||||
node2_zk.create("/test_read_write_multinode_node2", b"somedata2")
|
||||
|
||||
# stale reads are allowed
|
||||
while node1_zk.exists("/test_read_write_multinode_node2") is None:
|
||||
time.sleep(0.1)
|
||||
|
||||
# stale reads are allowed
|
||||
while node2_zk.exists("/test_read_write_multinode_node1") is None:
|
||||
time.sleep(0.1)
|
||||
|
||||
assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
|
||||
assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
|
||||
|
||||
assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
|
||||
assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
|
||||
|
||||
finally:
|
||||
try:
|
||||
for zk_conn in [node1_zk, node2_zk, node3_zk]:
|
||||
zk_conn.stop()
|
||||
zk_conn.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_read_write_two_nodes_with_blocade(started_cluster):
|
||||
try:
|
||||
wait_nodes()
|
||||
node1_zk = get_fake_zk("node1", timeout=5.0)
|
||||
node2_zk = get_fake_zk("node2", timeout=5.0)
|
||||
|
||||
print("Blocking nodes")
|
||||
with PartitionManager() as pm:
|
||||
pm.partition_instances(node2, node1)
|
||||
|
||||
# We will respond conection loss but process this query
|
||||
# after blocade will be removed
|
||||
with pytest.raises(Exception):
|
||||
node1_zk.create("/test_read_write_blocked_node1", b"somedata1")
|
||||
|
||||
# This node is not leader and will not process anything
|
||||
with pytest.raises(Exception):
|
||||
node2_zk.create("/test_read_write_blocked_node2", b"somedata2")
|
||||
|
||||
|
||||
print("Nodes unblocked")
|
||||
for i in range(10):
|
||||
try:
|
||||
node1_zk = get_fake_zk("node1")
|
||||
node2_zk = get_fake_zk("node2")
|
||||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
for i in range(100):
|
||||
try:
|
||||
node1_zk.create("/test_after_block1", b"somedata12")
|
||||
break
|
||||
except:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
raise Exception("node1 cannot recover after blockade")
|
||||
|
||||
print("Node1 created it's value")
|
||||
|
||||
for i in range(100):
|
||||
try:
|
||||
node2_zk.create("/test_after_block2", b"somedata12")
|
||||
break
|
||||
except:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
raise Exception("node2 cannot recover after blockade")
|
||||
|
||||
print("Node2 created it's value")
|
||||
|
||||
# stale reads are allowed
|
||||
while node1_zk.exists("/test_after_block2") is None:
|
||||
time.sleep(0.1)
|
||||
|
||||
# stale reads are allowed
|
||||
while node2_zk.exists("/test_after_block1") is None:
|
||||
time.sleep(0.1)
|
||||
|
||||
assert node1_zk.exists("/test_after_block1") is not None
|
||||
assert node1_zk.exists("/test_after_block2") is not None
|
||||
assert node2_zk.exists("/test_after_block1") is not None
|
||||
assert node2_zk.exists("/test_after_block2") is not None
|
||||
|
||||
finally:
|
||||
try:
|
||||
for zk_conn in [node1_zk, node2_zk, node3_zk]:
|
||||
zk_conn.stop()
|
||||
zk_conn.close()
|
||||
except:
|
||||
pass
|
@ -31,18 +31,33 @@ postgres_table_template_3 = """
|
||||
key1 Integer NOT NULL, value1 Integer, key2 Integer NOT NULL, value2 Integer NOT NULL)
|
||||
"""
|
||||
|
||||
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database'):
|
||||
def get_postgres_conn(ip, port, database=False, auto_commit=True, database_name='postgres_database', replication=False):
|
||||
if database == True:
|
||||
conn_string = "host={} port={} dbname='{}' user='postgres' password='mysecretpassword'".format(ip, port, database_name)
|
||||
else:
|
||||
conn_string = "host={} port={} user='postgres' password='mysecretpassword'".format(ip, port)
|
||||
|
||||
if replication:
|
||||
conn_string += " replication='database'"
|
||||
|
||||
conn = psycopg2.connect(conn_string)
|
||||
if auto_commit:
|
||||
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
def create_replication_slot(conn, slot_name='user_slot'):
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('CREATE_REPLICATION_SLOT {} LOGICAL pgoutput EXPORT_SNAPSHOT'.format(slot_name))
|
||||
result = cursor.fetchall()
|
||||
print(result[0][0]) # slot name
|
||||
print(result[0][1]) # start lsn
|
||||
print(result[0][2]) # snapshot
|
||||
return result[0][2]
|
||||
|
||||
def drop_replication_slot(conn, slot_name='user_slot'):
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("select pg_drop_replication_slot('{}')".format(slot_name))
|
||||
|
||||
def create_postgres_db(cursor, name='postgres_database'):
|
||||
cursor.execute("CREATE DATABASE {}".format(name))
|
||||
@ -941,6 +956,34 @@ def test_quoting(started_cluster):
|
||||
drop_materialized_db()
|
||||
|
||||
|
||||
def test_user_managed_slots(started_cluster):
|
||||
conn = get_postgres_conn(ip=started_cluster.postgres_ip,
|
||||
port=started_cluster.postgres_port,
|
||||
database=True)
|
||||
cursor = conn.cursor()
|
||||
table_name = 'test_table'
|
||||
create_postgres_table(cursor, table_name);
|
||||
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000)".format(table_name))
|
||||
|
||||
slot_name = 'user_slot'
|
||||
replication_connection = get_postgres_conn(ip=started_cluster.postgres_ip, port=started_cluster.postgres_port,
|
||||
database=True, replication=True, auto_commit=True)
|
||||
snapshot = create_replication_slot(replication_connection, slot_name=slot_name)
|
||||
create_materialized_db(ip=started_cluster.postgres_ip,
|
||||
port=started_cluster.postgres_port,
|
||||
settings=["materialized_postgresql_replication_slot = '{}'".format(slot_name),
|
||||
"materialized_postgresql_snapshot = '{}'".format(snapshot)])
|
||||
check_tables_are_synchronized(table_name);
|
||||
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(10000, 10000)".format(table_name))
|
||||
check_tables_are_synchronized(table_name);
|
||||
instance.restart_clickhouse()
|
||||
instance.query("INSERT INTO postgres_database.{} SELECT number, number from numbers(20000, 10000)".format(table_name))
|
||||
check_tables_are_synchronized(table_name);
|
||||
drop_postgres_table(cursor, table_name)
|
||||
drop_materialized_db()
|
||||
drop_replication_slot(replication_connection, slot_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
@ -3,7 +3,10 @@ from contextlib import contextmanager
|
||||
## sudo -H pip install PyMySQL
|
||||
import pymysql.cursors
|
||||
import pytest
|
||||
import time
|
||||
import threading
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
@ -319,6 +322,51 @@ CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL
|
||||
conn.close()
|
||||
|
||||
|
||||
# Check that limited connection_wait_timeout (via connection_pool_size=1) will throw.
|
||||
def test_settings_connection_wait_timeout(started_cluster):
|
||||
table_name = 'test_settings_connection_wait_timeout'
|
||||
node1.query(f'DROP TABLE IF EXISTS {table_name}')
|
||||
wait_timeout = 2
|
||||
|
||||
conn = get_mysql_conn(started_cluster, cluster.mysql_ip)
|
||||
drop_mysql_table(conn, table_name)
|
||||
create_mysql_table(conn, table_name)
|
||||
|
||||
node1.query('''
|
||||
CREATE TABLE {}
|
||||
(
|
||||
id UInt32,
|
||||
name String,
|
||||
age UInt32,
|
||||
money UInt32
|
||||
)
|
||||
ENGINE = MySQL('mysql57:3306', 'clickhouse', '{}', 'root', 'clickhouse')
|
||||
SETTINGS connection_wait_timeout={}, connection_pool_size=1
|
||||
'''.format(table_name, table_name, wait_timeout)
|
||||
)
|
||||
|
||||
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
|
||||
|
||||
def worker():
|
||||
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
|
||||
|
||||
worker_thread = threading.Thread(target=worker)
|
||||
worker_thread.start()
|
||||
|
||||
# ensure that first query started in worker_thread
|
||||
time.sleep(1)
|
||||
|
||||
started = time.time()
|
||||
with pytest.raises(QueryRuntimeException, match=r"Exception: mysqlxx::Pool is full \(connection_wait_timeout is exceeded\)"):
|
||||
node1.query("SELECT sleepEachRow(1) FROM {}".format(table_name))
|
||||
ended = time.time()
|
||||
assert (ended - started) >= wait_timeout
|
||||
|
||||
worker_thread.join()
|
||||
|
||||
drop_mysql_table(conn, table_name)
|
||||
conn.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
for name, instance in list(cluster.instances.items()):
|
||||
|
@ -2,3 +2,4 @@
|
||||
0 9998 5000
|
||||
1 9999 5000
|
||||
0 9998 5000
|
||||
1
|
||||
|
@ -11,4 +11,7 @@ select min(i), max(i), count() from d group by _partition_id order by _partition
|
||||
select min(i), max(i), count() from d where _partition_value.1 = 0 group by _partition_id order by _partition_id;
|
||||
select min(i), max(i), count() from d where _partition_value.1 = 10 group by _partition_id order by _partition_id;
|
||||
|
||||
-- fuzz crash
|
||||
select min(i) from d where 1 = _partition_value.1;
|
||||
|
||||
drop table d;
|
||||
|
@ -4,3 +4,5 @@
|
||||
7 107
|
||||
8 108
|
||||
9 109
|
||||
1970-01-01 1 one
|
||||
1970-01-01 3 three
|
||||
|
@ -8,3 +8,18 @@ set max_rows_to_read = 5;
|
||||
select * from test1 where i not in (1,2,3,4,5) order by i;
|
||||
|
||||
drop table test1;
|
||||
|
||||
drop table if exists t1;
|
||||
drop table if exists t2;
|
||||
|
||||
create table t1 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date;
|
||||
create table t2 (date Date, a Float64, b String) Engine=MergeTree ORDER BY date;
|
||||
|
||||
insert into t1(a, b) values (1, 'one'), (2, 'two');
|
||||
insert into t2(a, b) values (2, 'two'), (3, 'three');
|
||||
|
||||
select date, a, b from t1 where (date, a, b) NOT IN (select date,a,b from t2);
|
||||
select date, a, b from t2 where (date, a, b) NOT IN (select date,a,b from t1);
|
||||
|
||||
drop table t1;
|
||||
drop table t2;
|
||||
|
@ -1 +0,0 @@
|
||||
02015_db materialized_view 02015_db view_source_tb Materialized
|
@ -1,14 +0,0 @@
|
||||
DROP DATABASE IF EXISTS 02015_db;
|
||||
CREATE DATABASE IF NOT EXISTS 02015_db;
|
||||
|
||||
DROP TABLE IF EXISTS 02015_db.view_source_tb;
|
||||
CREATE TABLE IF NOT EXISTS 02015_db.view_source_tb (a UInt8, s String) ENGINE = MergeTree() ORDER BY a;
|
||||
|
||||
DROP TABLE IF EXISTS 02015_db.materialized_view;
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS 02015_db.materialized_view ENGINE = ReplacingMergeTree() ORDER BY a AS SELECT * FROM 02015_db.view_source_tb;
|
||||
|
||||
SELECT * FROM system.views WHERE database='02015_db' and name = 'materialized_view';
|
||||
|
||||
DROP TABLE IF EXISTS 02015_db.materialized_view;
|
||||
DROP TABLE IF EXISTS 02015_db.view_source_tb;
|
||||
DROP DATABASE IF EXISTS 02015_db;
|
@ -512,7 +512,6 @@
|
||||
"01532_execute_merges_on_single_replica", /// static zk path
|
||||
"01530_drop_database_atomic_sync", /// creates database
|
||||
"02001_add_default_database_to_system_users", ///create user
|
||||
"02002_row_level_filter_bug", ///create user
|
||||
"02015_system_views"
|
||||
"02002_row_level_filter_bug" ///create user
|
||||
]
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user