Merge branch 'master' into update-arrow-2

This commit is contained in:
Kruglov Pavel 2023-04-04 15:11:30 +02:00 committed by GitHub
commit 0fc76b00be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
91 changed files with 1503 additions and 159 deletions

View File

@ -0,0 +1,29 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v22.8.16.32-lts (7c4be737bd0) FIXME as compared to v22.8.15.23-lts (d36fa168bbf)
#### Build/Testing/Packaging Improvement
* Backported in [#48344](https://github.com/ClickHouse/ClickHouse/issues/48344): Use sccache as a replacement for ccache and using S3 as cache backend. [#46240](https://github.com/ClickHouse/ClickHouse/pull/46240) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Backported in [#48250](https://github.com/ClickHouse/ClickHouse/issues/48250): The `clickhouse/clickhouse-keeper` image used to be pushed only with tags `-alpine`, e.g. `latest-alpine`. As it was suggested in https://github.com/ClickHouse/examples/pull/2, now it will be pushed as suffixless too. [#48236](https://github.com/ClickHouse/ClickHouse/pull/48236) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix bug in zero-copy replication disk choice during fetch [#47010](https://github.com/ClickHouse/ClickHouse/pull/47010) ([alesapin](https://github.com/alesapin)).
* Fix query parameters [#47488](https://github.com/ClickHouse/ClickHouse/pull/47488) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix wait for zero copy lock during move [#47631](https://github.com/ClickHouse/ClickHouse/pull/47631) ([alesapin](https://github.com/alesapin)).
* Fix crash in polygonsSymDifferenceCartesian [#47702](https://github.com/ClickHouse/ClickHouse/pull/47702) ([pufit](https://github.com/pufit)).
* Backport to 22.8: Fix moving broken parts to the detached for the object storage disk on startup [#48273](https://github.com/ClickHouse/ClickHouse/pull/48273) ([Aleksei Filatov](https://github.com/aalexfvk)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Add a fuse for backport branches w/o a created PR [#47760](https://github.com/ClickHouse/ClickHouse/pull/47760) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Only valid Reviews.STATES overwrite existing reviews [#47789](https://github.com/ClickHouse/ClickHouse/pull/47789) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Place short return before big block, improve logging [#47822](https://github.com/ClickHouse/ClickHouse/pull/47822) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Artifacts s3 prefix [#47945](https://github.com/ClickHouse/ClickHouse/pull/47945) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Fix tsan error lock-order-inversion [#47953](https://github.com/ClickHouse/ClickHouse/pull/47953) ([Kruglov Pavel](https://github.com/Avogar)).

View File

@ -78,7 +78,8 @@ Of course, it's possible to manually run `CREATE TABLE` with same path on nonrel
### Inserts
When new rows are inserted into `KeeperMap`, if the key already exists, the value will be updated, otherwise new key is created.
When new rows are inserted into `KeeperMap`, if the key does not exist, a new entry for the key is created.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, an exception is thrown, otherwise, the value for the key is overwritten.
Example:
@ -89,6 +90,7 @@ INSERT INTO keeper_map_table VALUES ('some key', 1, 'value', 3.2);
### Deletes
Rows can be deleted using `DELETE` query or `TRUNCATE`.
If the key exists, and setting `keeper_map_strict_mode` is set to `true`, fetching and deleting data will succeed only if it can be executed atomically.
```sql
DELETE FROM keeper_map_table WHERE key LIKE 'some%' AND v1 > 1;
@ -105,6 +107,7 @@ TRUNCATE TABLE keeper_map_table;
### Updates
Values can be updated using `ALTER TABLE` query. Primary key cannot be updated.
If setting `keeper_map_strict_mode` is set to `true`, fetching and updating data will succeed only if it's executed atomically.
```sql
ALTER TABLE keeper_map_table UPDATE v1 = v1 * 10 + 2 WHERE key LIKE 'some%' AND v3 > 3.1;

View File

@ -74,7 +74,7 @@ Never set the block size too small or too large.
You can use RAID-0 on SSD.
Regardless of RAID use, always use replication for data security.
Enable NCQ with a long queue. For HDD, choose the CFQ scheduler, and for SSD, choose noop. Dont reduce the readahead setting.
Enable NCQ with a long queue. For HDD, choose the mq-deadline or CFQ scheduler, and for SSD, choose noop. Dont reduce the readahead setting.
For HDD, enable the write cache.
Make sure that [`fstrim`](https://en.wikipedia.org/wiki/Trim_(computing)) is enabled for NVME and SSD disks in your OS (usually it's implemented using a cronjob or systemd service).

View File

@ -8,10 +8,150 @@ sidebar_label: clickhouse-local
The `clickhouse-local` program enables you to perform fast processing on local files, without having to deploy and configure the ClickHouse server. It accepts data that represent tables and queries them using [ClickHouse SQL dialect](../../sql-reference/index.md). `clickhouse-local` uses the same core as ClickHouse server, so it supports most of the features and the same set of formats and table engines.
By default `clickhouse-local` has access to data on the same host, and it does not depend on the server's configuration. It also supports loading server configuration using `--config-file` argument. For temporary data, a unique temporary data directory is created by default.
## Download clickhouse-local
`clickhouse-local` is executed using the same `clickhouse` binary that runs the ClickHouse server and `clickhouse-client`. The easiest way to download the latest version is with the following command:
```bash
curl https://clickhouse.com/ | sh
```
:::note
The binary you just downloaded can run all sorts of ClickHouse tools and utilities. If you want to run ClickHouse as a database server, check out the [Quick Start](../../quick-start.mdx).
:::
## Query data in a CSV file using SQL
A common use of `clickhouse-local` is to run ad-hoc queries on files: where you don't have to insert the data into a table. `clickhouse-local` can stream the data from a file into a temporary table and execute your SQL.
If the file is sitting on the same machine as `clickhouse-local`, use the `file` table engine. The following `reviews.tsv` file contains a sampling of Amazon product reviews:
```bash
./clickhouse local -q "SELECT * FROM file('reviews.tsv')"
```
ClickHouse knows the file uses a tab-separated format from filename extension. If you need to explicitly specify the format, simply add one of the [many ClickHouse input formats](../../interfaces/formats.md):
```bash
./clickhouse local -q "SELECT * FROM file('reviews.tsv', 'TabSeparated')"
```
The `file` table function creates a table, and you can use `DESCRIBE` to see the inferred schema:
```bash
./clickhouse local -q "DESCRIBE file('reviews.tsv')"
```
```response
marketplace Nullable(String)
customer_id Nullable(Int64)
review_id Nullable(String)
product_id Nullable(String)
product_parent Nullable(Int64)
product_title Nullable(String)
product_category Nullable(String)
star_rating Nullable(Int64)
helpful_votes Nullable(Int64)
total_votes Nullable(Int64)
vine Nullable(String)
verified_purchase Nullable(String)
review_headline Nullable(String)
review_body Nullable(String)
review_date Nullable(Date)
```
Let's find a product with the highest rating:
```bash
./clickhouse local -q "SELECT
argMax(product_title,star_rating),
max(star_rating)
FROM file('reviews.tsv')"
```
```response
Monopoly Junior Board Game 5
```
## Query data in a Parquet file in AWS S3
If you have a file in S3, use `clickhouse-local` and the `s3` table function to query the file in place (without inserting the data into a ClickHouse table). We have a file named `house_0.parquet` in a public bucket that contains home prices of property sold in the United Kingdom. Let's see how many rows it has:
```bash
./clickhouse local -q "
SELECT count()
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet')"
```
The file has 2.7M rows:
```response
2772030
```
It's always useful to see what the inferred schema that ClickHouse determines from the file:
```bash
./clickhouse local -q "DESCRIBE s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet')"
```
```response
price Nullable(Int64)
date Nullable(UInt16)
postcode1 Nullable(String)
postcode2 Nullable(String)
type Nullable(String)
is_new Nullable(UInt8)
duration Nullable(String)
addr1 Nullable(String)
addr2 Nullable(String)
street Nullable(String)
locality Nullable(String)
town Nullable(String)
district Nullable(String)
county Nullable(String)
```
Let's see what the most expensive neighborhoods are:
```bash
./clickhouse local -q "
SELECT
town,
district,
count() AS c,
round(avg(price)) AS price,
bar(price, 0, 5000000, 100)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet')
GROUP BY
town,
district
HAVING c >= 100
ORDER BY price DESC
LIMIT 10"
```
```response
LONDON CITY OF LONDON 886 2271305 █████████████████████████████████████████████▍
LEATHERHEAD ELMBRIDGE 206 1176680 ███████████████████████▌
LONDON CITY OF WESTMINSTER 12577 1108221 ██████████████████████▏
LONDON KENSINGTON AND CHELSEA 8728 1094496 █████████████████████▉
HYTHE FOLKESTONE AND HYTHE 130 1023980 ████████████████████▍
CHALFONT ST GILES CHILTERN 113 835754 ████████████████▋
AMERSHAM BUCKINGHAMSHIRE 113 799596 ███████████████▉
VIRGINIA WATER RUNNYMEDE 356 789301 ███████████████▊
BARNET ENFIELD 282 740514 ██████████████▊
NORTHWOOD THREE RIVERS 184 731609 ██████████████▋
```
:::tip
When you are ready to insert your files into ClickHouse, startup a ClickHouse server and insert the results of your `file` and `s3` table functions into a `MergeTree` table. View the [Quick Start](../../quick-start.mdx) for more details.
:::
## Usage {#usage}
By default `clickhouse-local` has access to data of a ClickHouse server on the same host, and it does not depend on the server's configuration. It also supports loading server configuration using `--config-file` argument. For temporary data, a unique temporary data directory is created by default.
Basic usage (Linux):
``` bash
@ -24,7 +164,9 @@ Basic usage (Mac):
$ ./clickhouse local --structure "table_structure" --input-format "format_of_incoming_data" --query "query"
```
Also supported on Windows through WSL2.
:::note
`clickhouse-local` is also supported on Windows through WSL2.
:::
Arguments:

View File

@ -68,9 +68,9 @@ Result:
## mapFromArrays
Merges an [Array](../../sql-reference/data-types/array.md) of keys and an [Array](../../sql-reference/data-types/array.md) of values into a [Map(key, value)](../../sql-reference/data-types/map.md).
Merges an [Array](../../sql-reference/data-types/array.md) of keys and an [Array](../../sql-reference/data-types/array.md) of values into a [Map(key, value)](../../sql-reference/data-types/map.md). Notice that the second argument could also be a [Map](../../sql-reference/data-types/map.md), thus it is casted to an Array when executing.
The function is a more convenient alternative to `CAST((key_array, value_array), 'Map(key_type, value_type)')`. For example, instead of writing `CAST((['aa', 'bb'], [4, 5]), 'Map(String, UInt32)')`, you can write `mapFromArrays(['aa', 'bb'], [4, 5])`.
The function is a more convenient alternative to `CAST((key_array, value_array_or_map), 'Map(key_type, value_type)')`. For example, instead of writing `CAST((['aa', 'bb'], [4, 5]), 'Map(String, UInt32)')`, you can write `mapFromArrays(['aa', 'bb'], [4, 5])`.
**Syntax**
@ -82,11 +82,11 @@ Alias: `MAP_FROM_ARRAYS(keys, values)`
**Arguments**
- `keys` — Given key array to create a map from. The nested type of array must be: [String](../../sql-reference/data-types/string.md), [Integer](../../sql-reference/data-types/int-uint.md), [LowCardinality](../../sql-reference/data-types/lowcardinality.md), [FixedString](../../sql-reference/data-types/fixedstring.md), [UUID](../../sql-reference/data-types/uuid.md), [Date](../../sql-reference/data-types/date.md), [DateTime](../../sql-reference/data-types/datetime.md), [Date32](../../sql-reference/data-types/date32.md), [Enum](../../sql-reference/data-types/enum.md)
- `values` - Given value array to create a map from.
- `values` - Given value array or map to create a map from.
**Returned value**
- A map whose keys and values are constructed from the key and value arrays
- A map whose keys and values are constructed from the key array and value array/map.
**Example**
@ -94,13 +94,17 @@ Query:
```sql
select mapFromArrays(['a', 'b', 'c'], [1, 2, 3])
```
```text
┌─mapFromArrays(['a', 'b', 'c'], [1, 2, 3])─┐
│ {'a':1,'b':2,'c':3} │
└───────────────────────────────────────────┘
```
SELECT mapFromArrays([1, 2, 3], map('a', 1, 'b', 2, 'c', 3))
┌─mapFromArrays([1, 2, 3], map('a', 1, 'b', 2, 'c', 3))─┐
│ {1:('a',1),2:('b',2),3:('c',3)} │
└───────────────────────────────────────────────────────┘
```
## mapAdd

View File

@ -1,4 +1,5 @@
#include "ClusterCopierApp.h"
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <IO/ConnectionTimeouts.h>
@ -192,6 +193,8 @@ void ClusterCopierApp::mainImpl()
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
zkutil::validateZooKeeperConfig(config());
copier->init();
copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef()));

View File

@ -89,8 +89,12 @@ static std::vector<std::string> extractFromConfig(
if (has_zk_includes && process_zk_includes)
{
DB::ConfigurationPtr bootstrap_configuration(new Poco::Util::XMLConfiguration(config_xml));
zkutil::validateZooKeeperConfig(*bootstrap_configuration);
zkutil::ZooKeeperPtr zookeeper = std::make_shared<zkutil::ZooKeeper>(
*bootstrap_configuration, "zookeeper", nullptr);
*bootstrap_configuration, bootstrap_configuration->has("zookeeper") ? "zookeeper" : "keeper", nullptr);
zkutil::ZooKeeperNodeCache zk_node_cache([&] { return zookeeper; });
config_xml = processor.processConfig(&has_zk_includes, &zk_node_cache);
}

View File

@ -815,7 +815,8 @@ try
}
);
bool has_zookeeper = config().has("zookeeper");
zkutil::validateZooKeeperConfig(config());
bool has_zookeeper = zkutil::hasZooKeeperConfig(config());
zkutil::ZooKeeperNodeCache main_config_zk_node_cache([&] { return global_context->getZooKeeper(); });
zkutil::EventPtr main_config_zk_changed_event = std::make_shared<Poco::Event>();
@ -1307,7 +1308,7 @@ try
{
/// We do not load ZooKeeper configuration on the first config loading
/// because TestKeeper server is not started yet.
if (config->has("zookeeper"))
if (zkutil::hasZooKeeperConfig(*config))
global_context->reloadZooKeeperIfChanged(config);
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);

View File

@ -8,7 +8,8 @@
namespace DB
{
BackupCoordinationLocal::BackupCoordinationLocal(bool plain_backup_) : file_infos(plain_backup_)
BackupCoordinationLocal::BackupCoordinationLocal(bool plain_backup_)
: log(&Poco::Logger::get("BackupCoordinationLocal")), file_infos(plain_backup_)
{
}
@ -125,7 +126,12 @@ bool BackupCoordinationLocal::startWritingFile(size_t data_file_index)
bool BackupCoordinationLocal::hasConcurrentBackups(const std::atomic<size_t> & num_active_backups) const
{
return (num_active_backups > 1);
if (num_active_backups > 1)
{
LOG_WARNING(log, "Found concurrent backups: num_active_backups={}", num_active_backups);
return true;
}
return false;
}
}

View File

@ -52,6 +52,8 @@ public:
bool hasConcurrentBackups(const std::atomic<size_t> & num_active_backups) const override;
private:
Poco::Logger * const log;
BackupCoordinationReplicatedTables TSA_GUARDED_BY(replicated_tables_mutex) replicated_tables;
BackupCoordinationReplicatedAccess TSA_GUARDED_BY(replicated_access_mutex) replicated_access;
BackupCoordinationReplicatedSQLObjects TSA_GUARDED_BY(replicated_sql_objects_mutex) replicated_sql_objects;

View File

@ -164,17 +164,18 @@ BackupCoordinationRemote::BackupCoordinationRemote(
, current_host_index(findCurrentHostIndex(all_hosts, current_host))
, plain_backup(plain_backup_)
, is_internal(is_internal_)
, log(&Poco::Logger::get("BackupCoordinationRemote"))
{
zookeeper_retries_info = ZooKeeperRetriesInfo(
"BackupCoordinationRemote",
&Poco::Logger::get("BackupCoordinationRemote"),
log,
keeper_settings.keeper_max_retries,
keeper_settings.keeper_retry_initial_backoff_ms,
keeper_settings.keeper_retry_max_backoff_ms);
createRootNodes();
stage_sync.emplace(
zookeeper_path + "/stage", [this] { return getZooKeeper(); }, &Poco::Logger::get("BackupCoordination"));
zookeeper_path + "/stage", [this] { return getZooKeeper(); }, log);
}
BackupCoordinationRemote::~BackupCoordinationRemote()
@ -664,7 +665,10 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
const auto status = zk->get(root_zookeeper_path + "/" + existing_backup_path + "/stage");
if (status != Stage::COMPLETED)
{
LOG_WARNING(log, "Found a concurrent backup: {}, current backup: {}", existing_backup_uuid, toString(backup_uuid));
return true;
}
}
zk->createIfNotExists(backup_stage_path, "");

View File

@ -104,6 +104,7 @@ private:
const size_t current_host_index;
const bool plain_backup;
const bool is_internal;
Poco::Logger * const log;
mutable ZooKeeperRetriesInfo zookeeper_retries_info;
std::optional<BackupCoordinationStageSync> stage_sync;

View File

@ -69,7 +69,7 @@ namespace
S3::CredentialsConfiguration
{
settings.auth_settings.use_environment_credentials.value_or(
context->getConfigRef().getBool("s3.use_environment_credentials", false)),
context->getConfigRef().getBool("s3.use_environment_credentials", true)),
settings.auth_settings.use_insecure_imds_request.value_or(
context->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
settings.auth_settings.expiration_window_seconds.value_or(

View File

@ -1,10 +1,14 @@
#include <Backups/RestoreCoordinationLocal.h>
#include <Common/logger_useful.h>
namespace DB
{
RestoreCoordinationLocal::RestoreCoordinationLocal() = default;
RestoreCoordinationLocal::RestoreCoordinationLocal() : log(&Poco::Logger::get("RestoreCoordinationLocal"))
{
}
RestoreCoordinationLocal::~RestoreCoordinationLocal() = default;
void RestoreCoordinationLocal::setStage(const String &, const String &)
@ -49,7 +53,12 @@ bool RestoreCoordinationLocal::acquireReplicatedSQLObjects(const String &, UserD
bool RestoreCoordinationLocal::hasConcurrentRestores(const std::atomic<size_t> & num_active_restores) const
{
return (num_active_restores > 1);
if (num_active_restores > 1)
{
LOG_WARNING(log, "Found concurrent backups: num_active_restores={}", num_active_restores);
return true;
}
return false;
}
}

View File

@ -42,6 +42,8 @@ public:
bool hasConcurrentRestores(const std::atomic<size_t> & num_active_restores) const override;
private:
Poco::Logger * const log;
std::set<std::pair<String /* database_zk_path */, String /* table_name */>> acquired_tables_in_replicated_databases;
std::unordered_set<String /* table_zk_path */> acquired_data_in_replicated_tables;
mutable std::mutex mutex;

View File

@ -25,11 +25,12 @@ RestoreCoordinationRemote::RestoreCoordinationRemote(
, current_host(current_host_)
, current_host_index(BackupCoordinationRemote::findCurrentHostIndex(all_hosts, current_host))
, is_internal(is_internal_)
, log(&Poco::Logger::get("RestoreCoordinationRemote"))
{
createRootNodes();
stage_sync.emplace(
zookeeper_path + "/stage", [this] { return getZooKeeper(); }, &Poco::Logger::get("RestoreCoordination"));
zookeeper_path + "/stage", [this] { return getZooKeeper(); }, log);
}
RestoreCoordinationRemote::~RestoreCoordinationRemote()
@ -197,7 +198,10 @@ bool RestoreCoordinationRemote::hasConcurrentRestores(const std::atomic<size_t>
const auto status = zk->get(root_zookeeper_path + "/" + existing_restore_path + "/stage");
if (status != Stage::COMPLETED)
{
LOG_WARNING(log, "Found a concurrent restore: {}, current restore: {}", existing_restore_uuid, toString(restore_uuid));
return true;
}
}
zk->createIfNotExists(path, "");

View File

@ -59,6 +59,7 @@ private:
const String current_host;
const size_t current_host_index;
const bool is_internal;
Poco::Logger * const log;
std::optional<BackupCoordinationStageSync> stage_sync;

View File

@ -29,6 +29,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
}
@ -1340,4 +1342,29 @@ String getSequentialNodeName(const String & prefix, UInt64 number)
return name;
}
void validateZooKeeperConfig(const Poco::Util::AbstractConfiguration & config)
{
if (config.has("zookeeper") && config.has("keeper"))
throw DB::Exception(DB::ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "Both ZooKeeper and Keeper are specified");
}
bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config)
{
return config.has("zookeeper") || config.has("keeper") || (config.has("keeper_server") && config.getBool("keeper_server.use_cluster", true));
}
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config)
{
if (config.has("zookeeper"))
return "zookeeper";
if (config.has("keeper"))
return "keeper";
if (config.has("keeper_server") && config.getBool("keeper_server.use_cluster", true))
return "keeper_server";
throw DB::Exception(DB::ErrorCodes::NO_ELEMENTS_IN_CONFIG, "There is no Zookeeper configuration in server config");
}
}

View File

@ -669,4 +669,10 @@ String extractZooKeeperPath(const String & path, bool check_starts_with_slash, P
String getSequentialNodeName(const String & prefix, UInt64 number);
void validateZooKeeperConfig(const Poco::Util::AbstractConfiguration & config);
bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config);
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config);
}

View File

@ -18,6 +18,116 @@ namespace zkutil
{
ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name)
{
if (config_name == "keeper_server")
initFromKeeperServerSection(config);
else
initFromKeeperSection(config, config_name);
if (!chroot.empty())
{
if (chroot.front() != '/')
throw KeeperException(
Coordination::Error::ZBADARGUMENTS,
"Root path in config file should start with '/', but got {}", chroot);
if (chroot.back() == '/')
chroot.pop_back();
}
if (session_timeout_ms < 0 || operation_timeout_ms < 0 || connection_timeout_ms < 0)
throw KeeperException("Timeout cannot be negative", Coordination::Error::ZBADARGUMENTS);
/// init get_priority_load_balancing
get_priority_load_balancing.hostname_differences.resize(hosts.size());
const String & local_hostname = getFQDNOrHostName();
for (size_t i = 0; i < hosts.size(); ++i)
{
const String & node_host = hosts[i].substr(0, hosts[i].find_last_of(':'));
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, node_host);
}
}
ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string)
{
splitInto<','>(hosts, hosts_string);
}
void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfiguration & config)
{
static constexpr std::string_view config_name = "keeper_server";
if (auto key = std::string{config_name} + ".tcp_port_secure";
config.has(key))
{
auto tcp_port_secure = config.getString(key);
if (tcp_port_secure.empty())
throw KeeperException("Empty tcp_port_secure in config file", Coordination::Error::ZBADARGUMENTS);
}
bool secure{false};
std::string tcp_port;
if (auto tcp_port_secure_key = std::string{config_name} + ".tcp_port_secure";
config.has(tcp_port_secure_key))
{
secure = true;
tcp_port = config.getString(tcp_port_secure_key);
}
else if (auto tcp_port_key = std::string{config_name} + ".tcp_port";
config.has(tcp_port_key))
{
tcp_port = config.getString(tcp_port_key);
}
if (tcp_port.empty())
throw KeeperException("No tcp_port or tcp_port_secure in config file", Coordination::Error::ZBADARGUMENTS);
if (auto coordination_key = std::string{config_name} + ".coordination_settings";
config.has(coordination_key))
{
if (auto operation_timeout_key = coordination_key + ".operation_timeout_ms";
config.has(operation_timeout_key))
operation_timeout_ms = config.getInt(operation_timeout_key);
if (auto session_timeout_key = coordination_key + ".session_timeout_ms";
config.has(session_timeout_key))
session_timeout_ms = config.getInt(session_timeout_key);
}
Poco::Util::AbstractConfiguration::Keys keys;
std::string raft_configuration_key = std::string{config_name} + ".raft_configuration";
config.keys(raft_configuration_key, keys);
for (const auto & key : keys)
{
if (startsWith(key, "server"))
hosts.push_back(
(secure ? "secure://" : "") + config.getString(raft_configuration_key + "." + key + ".hostname") + ":" + tcp_port);
}
static constexpr std::array load_balancing_keys
{
".zookeeper_load_balancing",
".keeper_load_balancing"
};
for (const auto * load_balancing_key : load_balancing_keys)
{
if (auto load_balancing_config = std::string{config_name} + load_balancing_key;
config.has(load_balancing_config))
{
String load_balancing_str = config.getString(load_balancing_config);
/// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`)
auto load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
if (!load_balancing)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
get_priority_load_balancing.load_balancing = *load_balancing;
break;
}
}
}
void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
@ -84,7 +194,7 @@ ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, c
{
implementation = config.getString(config_name + "." + key);
}
else if (key == "zookeeper_load_balancing")
else if (key == "zookeeper_load_balancing" || key == "keeper_load_balancing")
{
String load_balancing_str = config.getString(config_name + "." + key);
/// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`)
@ -96,33 +206,6 @@ ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, c
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
}
if (!chroot.empty())
{
if (chroot.front() != '/')
throw KeeperException(
Coordination::Error::ZBADARGUMENTS,
"Root path in config file should start with '/', but got {}", chroot);
if (chroot.back() == '/')
chroot.pop_back();
}
if (session_timeout_ms < 0 || operation_timeout_ms < 0 || connection_timeout_ms < 0)
throw KeeperException("Timeout cannot be negative", Coordination::Error::ZBADARGUMENTS);
/// init get_priority_load_balancing
get_priority_load_balancing.hostname_differences.resize(hosts.size());
const String & local_hostname = getFQDNOrHostName();
for (size_t i = 0; i < hosts.size(); ++i)
{
const String & node_host = hosts[i].substr(0, hosts[i].find_last_of(':'));
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, node_host);
}
}
ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string)
{
splitInto<','>(hosts, hosts_string);
}
}

View File

@ -37,6 +37,10 @@ struct ZooKeeperArgs
UInt64 recv_sleep_ms = 0;
DB::GetPriorityForLoadBalancing get_priority_load_balancing;
private:
void initFromKeeperServerSection(const Poco::Util::AbstractConfiguration & config);
void initFromKeeperSection(const Poco::Util::AbstractConfiguration & config, const std::string & config_name);
};
}

View File

@ -105,7 +105,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
std::move(headers),
S3::CredentialsConfiguration
{
auth_settings.use_environment_credentials.value_or(false),
auth_settings.use_environment_credentials.value_or(true),
auth_settings.use_insecure_imds_request.value_or(false),
auth_settings.expiration_window_seconds.value_or(S3::DEFAULT_EXPIRATION_WINDOW_SECONDS),
auth_settings.no_sign_request.value_or(false),

View File

@ -724,6 +724,7 @@ class IColumn;
M(Bool, force_aggregation_in_order, false, "Force use of aggregation in order on remote nodes during distributed aggregation. PLEASE, NEVER CHANGE THIS SETTING VALUE MANUALLY!", IMPORTANT) \
M(UInt64, http_max_request_param_data_size, 10_MiB, "Limit on size of request data used as a query parameter in predefined HTTP requests.", 0) \
M(Bool, allow_experimental_undrop_table_query, false, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(Bool, function_json_value_return_type_allow_nullable, false, "Allow function to return nullable type.", 0) \
M(Bool, function_json_value_return_type_allow_complex, false, "Allow function to return complex type, such as: struct, array, map.", 0) \
// End of COMMON_SETTINGS

View File

@ -34,6 +34,7 @@
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Storages/StorageKeeperMap.h>
namespace DB
{
@ -1390,6 +1391,13 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context,
/// Some ALTERs are not replicated on database level
if (const auto * alter = query_ptr->as<const ASTAlterQuery>())
{
auto table_id = query_context->resolveStorageID(*alter, Context::ResolveOrdinary);
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, query_context);
/// we never replicate KeeperMap operations because it doesn't make sense
if (auto * keeper_map = table->as<StorageKeeperMap>())
return false;
return !alter->isAttachAlter() && !alter->isFetchAlter() && !alter->isDropPartitionAlter();
}

View File

@ -154,7 +154,7 @@ std::unique_ptr<S3::Client> getClient(
{},
S3::CredentialsConfiguration
{
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", false)),
config.getBool(config_prefix + ".use_environment_credentials", config.getBool("s3.use_environment_credentials", true)),
config.getBool(config_prefix + ".use_insecure_imds_request", config.getBool("s3.use_insecure_imds_request", false)),
config.getUInt64(config_prefix + ".expiration_window_seconds", config.getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
config.getBool(config_prefix + ".no_sign_request", config.getBool("s3.no_sign_request", false))

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast)
@ -132,6 +133,12 @@ ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & f
if (!user_defined_function)
return nullptr;
/// All UDFs are not parametric for now.
if (function.parameters)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function.name);
}
const auto & function_arguments_list = function.children.at(0)->as<ASTExpressionList>();
auto & function_arguments = function_arguments_list->children;

View File

@ -174,23 +174,31 @@ public:
getName(),
arguments.size());
const auto * keys_type = checkAndGetDataType<DataTypeArray>(arguments[0].get());
if (!keys_type)
/// The first argument should always be Array.
/// Because key type can not be nested type of Map, which is Tuple
DataTypePtr key_type;
if (const auto * keys_type = checkAndGetDataType<DataTypeArray>(arguments[0].get()))
key_type = keys_type->getNestedType();
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be an Array", getName());
const auto * values_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
if (!values_type)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Second argument for function {} must be an Array", getName());
DataTypePtr value_type;
if (const auto * value_array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get()))
value_type = value_array_type->getNestedType();
else if (const auto * value_map_type = checkAndGetDataType<DataTypeMap>(arguments[1].get()))
value_type = std::make_shared<DataTypeTuple>(value_map_type->getKeyValueTypes());
else
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Second argument for function {} must be Array or Map", getName());
DataTypes key_value_types{keys_type->getNestedType(), values_type->getNestedType()};
DataTypes key_value_types{key_type, value_type};
return std::make_shared<DataTypeMap>(key_value_types);
}
ColumnPtr executeImpl(
const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* result_type */, size_t /* input_rows_count */) const override
{
ColumnPtr holder_keys;
bool is_keys_const = isColumnConst(*arguments[0].column);
ColumnPtr holder_keys;
const ColumnArray * col_keys;
if (is_keys_const)
{
@ -202,24 +210,26 @@ public:
col_keys = checkAndGetColumn<ColumnArray>(arguments[0].column.get());
}
ColumnPtr holder_values;
bool is_values_const = isColumnConst(*arguments[1].column);
const ColumnArray * col_values;
if (is_values_const)
{
holder_values = arguments[1].column->convertToFullColumnIfConst();
col_values = checkAndGetColumn<ColumnArray>(holder_values.get());
}
else
{
col_values = checkAndGetColumn<ColumnArray>(arguments[1].column.get());
}
if (!col_keys)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The first argument of function {} must be Array", getName());
if (!col_keys || !col_values)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Arguments of function {} must be array", getName());
bool is_values_const = isColumnConst(*arguments[1].column);
ColumnPtr holder_values;
if (is_values_const)
holder_values = arguments[1].column->convertToFullColumnIfConst();
else
holder_values = arguments[1].column;
const ColumnArray * col_values;
if (const auto * col_values_array = checkAndGetColumn<ColumnArray>(holder_values.get()))
col_values = col_values_array;
else if (const auto * col_values_map = checkAndGetColumn<ColumnMap>(holder_values.get()))
col_values = &col_values_map->getNestedColumn();
else
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "The second arguments of function {} must be Array or Map", getName());
if (!col_keys->hasEqualOffsets(*col_values))
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Array arguments for function {} must have equal sizes", getName());
throw Exception(ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Two arguments for function {} must have equal sizes", getName());
const auto & data_keys = col_keys->getDataPtr();
const auto & data_values = col_values->getDataPtr();

View File

@ -1,4 +1,5 @@
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Common/Exception.h>
namespace DB
@ -35,9 +36,9 @@ void CascadeWriteBuffer::nextImpl()
curr_buffer->position() = position();
curr_buffer->next();
}
catch (const Exception & e)
catch (const MemoryWriteBuffer::CurrentBufferExhausted &)
{
if (curr_buffer_num < num_sources && e.code() == ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED)
if (curr_buffer_num < num_sources)
{
/// TODO: protocol should require set(position(), 0) before Exception
@ -46,7 +47,7 @@ void CascadeWriteBuffer::nextImpl()
curr_buffer = setNextBuffer();
}
else
throw;
throw Exception(ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED, "MemoryWriteBuffer limit is exhausted");
}
set(curr_buffer->position(), curr_buffer->buffer().end() - curr_buffer->position());

View File

@ -16,7 +16,7 @@ namespace ErrorCodes
* (lazy_sources contains not pointers themself, but their delayed constructors)
*
* Firtly, CascadeWriteBuffer redirects data to first buffer of the sequence
* If current WriteBuffer cannot receive data anymore, it throws special exception CURRENT_WRITE_BUFFER_IS_EXHAUSTED in nextImpl() body,
* If current WriteBuffer cannot receive data anymore, it throws special exception MemoryWriteBuffer::CurrentBufferExhausted in nextImpl() body,
* CascadeWriteBuffer prepare next buffer and continuously redirects data to it.
* If there are no buffers anymore CascadeWriteBuffer throws an exception.
*

View File

@ -5,12 +5,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
}
class ReadBufferFromMemoryWriteBuffer : public ReadBuffer, boost::noncopyable, private Allocator<false>
{
public:
@ -118,7 +112,7 @@ void MemoryWriteBuffer::addChunk()
if (0 == next_chunk_size)
{
set(position(), 0);
throw Exception(ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED, "MemoryWriteBuffer limit is exhausted");
throw MemoryWriteBuffer::CurrentBufferExhausted();
}
}

View File

@ -16,6 +16,12 @@ namespace DB
class MemoryWriteBuffer : public WriteBuffer, public IReadableWriteBuffer, boost::noncopyable, private Allocator<false>
{
public:
/// Special exception to throw when the current WriteBuffer cannot receive data
class CurrentBufferExhausted : public std::exception
{
public:
const char * what() const noexcept override { return "MemoryWriteBuffer limit is exhausted"; }
};
/// Use max_total_size_ = 0 for unlimited storage
explicit MemoryWriteBuffer(

View File

@ -198,7 +198,7 @@ TEST(MemoryWriteBuffer, WriteAndReread)
if (s > 1)
{
MemoryWriteBuffer buf(s - 1);
EXPECT_THROW(buf.write(data.data(), data.size()), DB::Exception);
EXPECT_THROW(buf.write(data.data(), data.size()), MemoryWriteBuffer::CurrentBufferExhausted);
}
}

View File

@ -75,6 +75,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
static NamesAndTypesList::iterator findColumn(const String & name, NamesAndTypesList & cols)
@ -1109,6 +1110,12 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
}
}
/// Normal functions are not parametric for now.
if (node.parameters)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", node.name);
}
Names argument_names;
DataTypes argument_types;
bool arguments_present = true;

View File

@ -2362,7 +2362,7 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, "zookeeper", getZooKeeperLog());
shared->zookeeper = std::make_shared<zkutil::ZooKeeper>(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
else if (shared->zookeeper->expired())
{
Stopwatch watch;
@ -2401,8 +2401,9 @@ bool Context::tryCheckClientConnectionToMyKeeperCluster() const
{
try
{
const auto config_name = zkutil::getZooKeeperConfigName(getConfigRef());
/// If our server is part of main Keeper cluster
if (checkZooKeeperConfigIsLocal(getConfigRef(), "zookeeper"))
if (config_name == "keeper_server" || checkZooKeeperConfigIsLocal(getConfigRef(), config_name))
{
LOG_DEBUG(shared->log, "Keeper server is participant of the main zookeeper cluster, will try to connect to it");
getZooKeeper();
@ -2608,7 +2609,7 @@ void Context::reloadZooKeeperIfChanged(const ConfigurationPtr & config) const
bool server_started = isServerCompletelyStarted();
std::lock_guard lock(shared->zookeeper_mutex);
shared->zookeeper_config = config;
reloadZooKeeperIfChangedImpl(config, "zookeeper", shared->zookeeper, getZooKeeperLog(), server_started);
reloadZooKeeperIfChangedImpl(config, zkutil::getZooKeeperConfigName(*config), shared->zookeeper, getZooKeeperLog(), server_started);
}
void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config)
@ -2633,7 +2634,7 @@ void Context::reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr &
bool Context::hasZooKeeper() const
{
return getConfigRef().has("zookeeper");
return zkutil::hasZooKeeperConfig(getConfigRef());
}
bool Context::hasAuxiliaryZooKeeper(const String & name) const

View File

@ -20,6 +20,7 @@
#include <Storages/LiveView/StorageLiveView.h>
#include <Storages/MutationCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/StorageKeeperMap.h>
#include <Common/typeid_cast.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
@ -39,6 +40,8 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int NOT_IMPLEMENTED;
extern const int TABLE_IS_READ_ONLY;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
}
@ -72,16 +75,21 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
if (!UserDefinedSQLFunctionFactory::instance().empty())
UserDefinedSQLFunctionVisitor::visit(query_ptr);
auto table_id = getContext()->resolveStorageID(alter, Context::ResolveOrdinary);
query_ptr->as<ASTAlterQuery &>().setDatabase(table_id.database_name);
StoragePtr table = DatabaseCatalog::instance().tryGetTable(table_id, getContext());
if (!alter.cluster.empty() && !maybeRemoveOnCluster(query_ptr, getContext()))
{
if (table && table->as<StorageKeeperMap>())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mutations with ON CLUSTER are not allowed for KeeperMap tables");
DDLQueryOnClusterParams params;
params.access_to_check = getRequiredAccess();
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
}
getContext()->checkAccess(getRequiredAccess());
auto table_id = getContext()->resolveStorageID(alter, Context::ResolveOrdinary);
query_ptr->as<ASTAlterQuery &>().setDatabase(table_id.database_name);
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (database->shouldReplicateQuery(getContext(), query_ptr))
@ -91,7 +99,9 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
return database->tryEnqueueReplicatedDDL(query_ptr, getContext());
}
StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Could not find table: {}", table_id.table_name);
checkStorageSupportsTransactionsIfNeeded(table, getContext());
if (table->isStaticStorage())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is read-only");

View File

@ -550,6 +550,12 @@ void MutationsInterpreter::prepare(bool dry_run)
if (source.hasLightweightDeleteMask())
all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN});
if (return_all_columns)
{
for (const auto & column : source.getStorage()->getVirtuals())
all_columns.push_back(column);
}
NameSet updated_columns;
bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly();
@ -906,6 +912,8 @@ void MutationsInterpreter::prepareMutationStages(std::vector<Stage> & prepared_s
{
auto storage_snapshot = source.getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
if (return_all_columns)
options.withVirtuals();
auto all_columns = storage_snapshot->getColumns(options);
/// Add _row_exists column if it is present in the part
@ -1256,6 +1264,7 @@ void MutationsInterpreter::validate()
}
QueryPlan plan;
initQueryPlan(stages.front(), plan);
auto pipeline = addStreamsForLaterStages(stages, plan);
}

View File

@ -103,7 +103,7 @@ namespace
});
}
bool parseElement(IParser::Pos & pos, Expected & expected, bool allow_all, Element & element)
bool parseElement(IParser::Pos & pos, Expected & expected, Element & element)
{
return IParserBase::wrapParseImpl(pos, [&]
{
@ -169,7 +169,7 @@ namespace
return true;
}
if (allow_all && ParserKeyword{"ALL"}.ignore(pos, expected))
if (ParserKeyword{"ALL"}.ignore(pos, expected))
{
element.type = ElementType::ALL;
parseExceptDatabases(pos, expected, element.except_databases);
@ -181,7 +181,7 @@ namespace
});
}
bool parseElements(IParser::Pos & pos, Expected & expected, bool allow_all, std::vector<Element> & elements)
bool parseElements(IParser::Pos & pos, Expected & expected, std::vector<Element> & elements)
{
return IParserBase::wrapParseImpl(pos, [&]
{
@ -190,7 +190,7 @@ namespace
auto parse_element = [&]
{
Element element;
if (parseElement(pos, expected, allow_all, element))
if (parseElement(pos, expected, element))
{
result.emplace_back(std::move(element));
return true;
@ -334,11 +334,8 @@ bool ParserBackupQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else
return false;
/// Disable "ALL" if this is a RESTORE command.
bool allow_all = (kind == Kind::RESTORE);
std::vector<Element> elements;
if (!parseElements(pos, expected, allow_all, elements))
if (!parseElements(pos, expected, elements))
return false;
String cluster;

View File

@ -2940,7 +2940,8 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
old_types.emplace(column.name, column.type.get());
NamesAndTypesList columns_to_check_conversion;
auto name_deps = getDependentViewsByColumn(local_context);
std::optional<NameDependencies> name_deps{};
for (const AlterCommand & command : commands)
{
/// Just validate partition expression
@ -3022,7 +3023,9 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
if (!command.clear)
{
const auto & deps_mv = name_deps[command.column_name];
if (!name_deps)
name_deps = getDependentViewsByColumn(local_context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,

View File

@ -1016,7 +1016,7 @@ void StorageBuffer::reschedule()
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);
std::optional<NameDependencies> name_deps{};
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
@ -1027,7 +1027,9 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, Context
if (command.type == AlterCommand::Type::DROP_COLUMN && !command.clear)
{
const auto & deps_mv = name_deps[command.column_name];
if (!name_deps)
name_deps = getDependentViewsByColumn(local_context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,

View File

@ -1401,7 +1401,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWrite(const ASTInser
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);
std::optional<NameDependencies> name_deps{};
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
@ -1413,7 +1413,9 @@ void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, Co
if (command.type == AlterCommand::DROP_COLUMN && !command.clear)
{
const auto & deps_mv = name_deps[command.column_name];
if (!name_deps)
name_deps = getDependentViewsByColumn(local_context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,

View File

@ -59,6 +59,8 @@ namespace ErrorCodes
namespace
{
constexpr std::string_view version_column_name = "_version";
std::string formattedAST(const ASTPtr & ast)
{
if (!ast)
@ -77,7 +79,6 @@ void verifyTableId(const StorageID & table_id)
table_id.getDatabaseName(),
database->getEngineName());
}
}
}
@ -86,11 +87,13 @@ class StorageKeeperMapSink : public SinkToStorage
{
StorageKeeperMap & storage;
std::unordered_map<std::string, std::string> new_values;
std::unordered_map<std::string, int32_t> versions;
size_t primary_key_pos;
ContextPtr context;
public:
StorageKeeperMapSink(StorageKeeperMap & storage_, const StorageMetadataPtr & metadata_snapshot)
: SinkToStorage(metadata_snapshot->getSampleBlock()), storage(storage_)
StorageKeeperMapSink(StorageKeeperMap & storage_, Block header, ContextPtr context_)
: SinkToStorage(header), storage(storage_), context(std::move(context_))
{
auto primary_key = storage.getPrimaryKey();
assert(primary_key.size() == 1);
@ -113,18 +116,36 @@ public:
wb_value.restart();
size_t idx = 0;
int32_t version = -1;
for (const auto & elem : block)
{
if (elem.name == version_column_name)
{
version = assert_cast<const ColumnVector<Int32> &>(*elem.column).getData()[i];
continue;
}
elem.type->getDefaultSerialization()->serializeBinary(*elem.column, i, idx == primary_key_pos ? wb_key : wb_value, {});
++idx;
}
auto key = base64Encode(wb_key.str(), /* url_encoding */ true);
if (version != -1)
versions[key] = version;
new_values[std::move(key)] = std::move(wb_value.str());
}
}
void onFinish() override
{
finalize<false>(/*strict*/ context->getSettingsRef().keeper_map_strict_mode);
}
template <bool for_update>
void finalize(bool strict)
{
auto zookeeper = storage.getClient();
@ -147,21 +168,39 @@ public:
for (const auto & [key, _] : new_values)
key_paths.push_back(storage.fullPathForKey(key));
auto results = zookeeper->exists(key_paths);
zkutil::ZooKeeper::MultiExistsResponse results;
if constexpr (!for_update)
{
if (!strict)
results = zookeeper->exists(key_paths);
}
Coordination::Requests requests;
requests.reserve(key_paths.size());
for (size_t i = 0; i < key_paths.size(); ++i)
{
auto key = fs::path(key_paths[i]).filename();
if (results[i].error == Coordination::Error::ZOK)
if constexpr (for_update)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
int32_t version = -1;
if (strict)
version = versions.at(key);
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], version));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
if (!strict && results[i].error == Coordination::Error::ZOK)
{
requests.push_back(zkutil::makeSetRequest(key_paths[i], new_values[key], -1));
}
else
{
requests.push_back(zkutil::makeCreateRequest(key_paths[i], new_values[key], zkutil::CreateMode::Persistent));
++new_keys_num;
}
}
}
@ -193,6 +232,18 @@ class StorageKeeperMapSource : public ISource
KeyContainerIter it;
KeyContainerIter end;
bool with_version_column = false;
static Block getHeader(Block header, bool with_version_column)
{
if (with_version_column)
header.insert(
{DataTypeInt32{}.createColumn(),
std::make_shared<DataTypeInt32>(), std::string{version_column_name}});
return header;
}
public:
StorageKeeperMapSource(
const StorageKeeperMap & storage_,
@ -200,8 +251,10 @@ public:
size_t max_block_size_,
KeyContainerPtr container_,
KeyContainerIter begin_,
KeyContainerIter end_)
: ISource(header), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
KeyContainerIter end_,
bool with_version_column_)
: ISource(getHeader(header, with_version_column_)), storage(storage_), max_block_size(max_block_size_), container(std::move(container_)), it(begin_), end(end_)
, with_version_column(with_version_column_)
{
}
@ -225,12 +278,12 @@ public:
for (auto & raw_key : raw_keys)
raw_key = base64Encode(raw_key, /* url_encoding */ true);
return storage.getBySerializedKeys(raw_keys, nullptr);
return storage.getBySerializedKeys(raw_keys, nullptr, with_version_column);
}
else
{
size_t elem_num = std::min(max_block_size, static_cast<size_t>(end - it));
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr);
auto chunk = storage.getBySerializedKeys(std::span{it, it + elem_num}, nullptr, with_version_column);
it += elem_num;
return chunk;
}
@ -426,6 +479,16 @@ Pipe StorageKeeperMap::read(
auto primary_key_type = sample_block.getByName(primary_key).type;
std::tie(filtered_keys, all_scan) = getFilterKeys(primary_key, primary_key_type, query_info, context_);
bool with_version_column = false;
for (const auto & column : column_names)
{
if (column == version_column_name)
{
with_version_column = true;
break;
}
}
const auto process_keys = [&]<typename KeyContainerPtr>(KeyContainerPtr keys) -> Pipe
{
if (keys->empty())
@ -449,7 +512,7 @@ Pipe StorageKeeperMap::read(
using KeyContainer = typename KeyContainerPtr::element_type;
pipes.emplace_back(std::make_shared<StorageKeeperMapSource<KeyContainer>>(
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end));
*this, sample_block, max_block_size, keys, keys->begin() + begin, keys->begin() + end, with_version_column));
}
return Pipe::unitePipes(std::move(pipes));
};
@ -461,10 +524,10 @@ Pipe StorageKeeperMap::read(
return process_keys(std::move(filtered_keys));
}
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/)
SinkToStoragePtr StorageKeeperMap::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
{
checkTable<true>();
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
return std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot->getSampleBlock(), local_context);
}
void StorageKeeperMap::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
@ -554,6 +617,12 @@ void StorageKeeperMap::drop()
dropTable(client, metadata_drop_lock);
}
NamesAndTypesList StorageKeeperMap::getVirtuals() const
{
return NamesAndTypesList{
{std::string{version_column_name}, std::make_shared<DataTypeInt32>()}};
}
zkutil::ZooKeeperPtr StorageKeeperMap::getClient() const
{
std::lock_guard lock{zookeeper_mutex};
@ -670,13 +739,18 @@ Chunk StorageKeeperMap::getByKeys(const ColumnsWithTypeAndName & keys, PaddedPOD
if (raw_keys.size() != keys[0].column->size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Assertion failed: {} != {}", raw_keys.size(), keys[0].column->size());
return getBySerializedKeys(raw_keys, &null_map);
return getBySerializedKeys(raw_keys, &null_map, /* version_column */ false);
}
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const
Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const
{
Block sample_block = getInMemoryMetadataPtr()->getSampleBlock();
MutableColumns columns = sample_block.cloneEmptyColumns();
MutableColumnPtr version_column = nullptr;
if (with_version)
version_column = ColumnVector<Int32>::create();
size_t primary_key_pos = getPrimaryKeyPos(sample_block, getPrimaryKey());
if (null_map)
@ -706,6 +780,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
if (code == Coordination::Error::ZOK)
{
fillColumns(base64Decode(keys[i], true), response.data, primary_key_pos, sample_block, columns);
if (version_column)
version_column->insert(response.stat.version);
}
else if (code == Coordination::Error::ZNONODE)
{
@ -714,6 +791,9 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
(*null_map)[i] = 0;
for (size_t col_idx = 0; col_idx < sample_block.columns(); ++col_idx)
columns[col_idx]->insert(sample_block.getByPosition(col_idx).type->getDefault());
if (version_column)
version_column->insert(-1);
}
}
else
@ -723,6 +803,10 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
}
size_t num_rows = columns.at(0)->size();
if (version_column)
columns.push_back(std::move(version_column));
return Chunk(std::move(columns), num_rows);
}
@ -763,6 +847,8 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
if (commands.empty())
return;
bool strict = local_context->getSettingsRef().keeper_map_strict_mode;
assert(commands.size() == 1);
auto metadata_snapshot = getInMemoryMetadataPtr();
@ -784,8 +870,10 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto header = interpreter->getUpdatedHeader();
auto primary_key_pos = header.getPositionByName(primary_key);
auto version_position = header.getPositionByName(std::string{version_column_name});
auto client = getClient();
Block block;
while (executor.pull(block))
{
@ -793,14 +881,23 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto column = column_type_name.column;
auto size = column->size();
WriteBufferFromOwnString wb_key;
Coordination::Requests delete_requests;
for (size_t i = 0; i < size; ++i)
{
int32_t version = -1;
if (strict)
{
const auto & version_column = block.getByPosition(version_position).column;
version = assert_cast<const ColumnVector<Int32> &>(*version_column).getData()[i];
}
wb_key.restart();
column_type_name.type->getDefaultSerialization()->serializeBinary(*column, i, wb_key, {});
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), -1));
delete_requests.emplace_back(zkutil::makeRemoveRequest(fullPathForKey(base64Encode(wb_key.str(), true)), version));
}
Coordination::Responses responses;
@ -834,12 +931,13 @@ void StorageKeeperMap::mutate(const MutationCommands & commands, ContextPtr loca
auto pipeline = QueryPipelineBuilder::getPipeline(interpreter->execute());
PullingPipelineExecutor executor(pipeline);
auto sink = std::make_shared<StorageKeeperMapSink>(*this, metadata_snapshot);
auto sink = std::make_shared<StorageKeeperMapSink>(*this, executor.getHeader(), local_context);
Block block;
while (executor.pull(block))
sink->consume(Chunk{block.getColumns(), block.rows()});
sink->onFinish();
sink->finalize<true>(strict);
}
namespace

View File

@ -46,11 +46,13 @@ public:
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
void drop() override;
NamesAndTypesList getVirtuals() const override;
std::string getName() const override { return "KeeperMap"; }
Names getPrimaryKey() const override { return {primary_key}; }
Chunk getByKeys(const ColumnsWithTypeAndName & keys, PaddedPODArray<UInt8> & null_map, const Names &) const override;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map) const;
Chunk getBySerializedKeys(std::span<const std::string> keys, PaddedPODArray<UInt8> * null_map, bool with_version) const;
Block getSampleBlock(const Names &) const override;

View File

@ -923,7 +923,7 @@ StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(Context
void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
auto name_deps = getDependentViewsByColumn(local_context);
std::optional<NameDependencies> name_deps{};
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
@ -934,7 +934,9 @@ void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, ContextP
if (command.type == AlterCommand::Type::DROP_COLUMN && !command.clear)
{
const auto & deps_mv = name_deps[command.column_name];
if (!name_deps)
name_deps = getDependentViewsByColumn(local_context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,

View File

@ -37,7 +37,7 @@ void registerStorageNull(StorageFactory & factory)
void StorageNull::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
{
auto name_deps = getDependentViewsByColumn(context);
std::optional<NameDependencies> name_deps{};
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN
@ -50,7 +50,9 @@ void StorageNull::checkAlterIsPossible(const AlterCommands & commands, ContextPt
if (command.type == AlterCommand::DROP_COLUMN && !command.clear)
{
const auto & deps_mv = name_deps[command.column_name];
if (!name_deps)
name_deps = getDependentViewsByColumn(context);
const auto & deps_mv = name_deps.value()[command.column_name];
if (!deps_mv.empty())
{
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,

View File

@ -1252,7 +1252,7 @@ void StorageS3::updateConfiguration(ContextPtr ctx, StorageS3::Configuration & u
upd.auth_settings.server_side_encryption_customer_key_base64,
std::move(headers),
S3::CredentialsConfiguration{
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", false)),
upd.auth_settings.use_environment_credentials.value_or(ctx->getConfigRef().getBool("s3.use_environment_credentials", true)),
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)),
upd.auth_settings.expiration_window_seconds.value_or(
ctx->getConfigRef().getUInt64("s3.expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS)),
@ -1272,7 +1272,7 @@ void StorageS3::processNamedCollectionResult(StorageS3::Configuration & configur
configuration.auth_settings.access_key_id = collection.getOrDefault<String>("access_key_id", "");
configuration.auth_settings.secret_access_key = collection.getOrDefault<String>("secret_access_key", "");
configuration.auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 0);
configuration.auth_settings.use_environment_credentials = collection.getOrDefault<UInt64>("use_environment_credentials", 1);
configuration.auth_settings.no_sign_request = collection.getOrDefault<bool>("no_sign_request", false);
configuration.auth_settings.expiration_window_seconds = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);

View File

@ -26,7 +26,7 @@ try
auto config = processor.loadConfig().configuration;
String root_path = argv[2];
zkutil::ZooKeeper zk(*config, "zookeeper", nullptr);
zkutil::ZooKeeper zk(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
String temp_path = root_path + "/temp";
String blocks_path = root_path + "/block_numbers";

View File

@ -29,7 +29,7 @@ try
auto config = processor.loadConfig().configuration;
String zookeeper_path = argv[2];
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, "zookeeper", nullptr);
auto zookeeper = std::make_shared<zkutil::ZooKeeper>(*config, zkutil::getZooKeeperConfigName(*config), nullptr);
std::unordered_map<String, std::set<Int64>> current_inserts;

View File

@ -144,7 +144,7 @@ def clickhouse_execute_http(
except Exception as ex:
if i == max_http_retries - 1:
raise ex
client.close()
sleep(i + 1)
if res.status != 200:

View File

@ -0,0 +1,5 @@
<clickhouse>
<s3>
<use_environment_credentials>0</use_environment_credentials>
</s3>
</clickhouse>

View File

@ -55,6 +55,7 @@ ln -sf $SRC_PATH/config.d/custom_disks_base_path.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/display_name.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/reverse_dns_query_function.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/compressed_marks_and_index.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/disable_s3_env_credentials.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.
if [ "${DEST_SERVER_PATH}" = "/etc/clickhouse-server" ]

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>3</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>3</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>3</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,17 @@
<clickhouse>
<keeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</keeper>
</clickhouse>

View File

@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,17 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=[
"configs/remote_servers.xml",
"configs/keeper_config.xml",
"configs/enable_keeper1.xml",
],
macros={"replica": "node1"},
)
node2 = cluster.add_instance(
"node2",
main_configs=[
"configs/remote_servers.xml",
"configs/zookeeper_config.xml",
"configs/enable_keeper2.xml",
],
macros={"replica": "node2"},
)
node3 = cluster.add_instance(
"node3",
main_configs=["configs/remote_servers.xml", "configs/enable_keeper3.xml"],
macros={"replica": "node3"},
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_create_insert(started_cluster):
node1.query("DROP TABLE IF EXISTS tbl ON CLUSTER 'test_cluster' NO DELAY")
node1.query(
"""
CREATE TABLE tbl ON CLUSTER 'test_cluster' (
id Int64,
str String
) ENGINE=ReplicatedMergeTree('/clickhouse/tables/tbl/', '{replica}')
ORDER BY id
"""
)
node1.query("INSERT INTO tbl VALUES (1, 'str1')")
node2.query("INSERT INTO tbl VALUES (1, 'str1')") # Test deduplication
node3.query("INSERT INTO tbl VALUES (2, 'str2')")
for node in [node1, node2, node3]:
expected = [[1, "str1"], [2, "str2"]]
assert node.query("SELECT * FROM tbl ORDER BY id") == TSV(expected)
assert node.query("CHECK TABLE tbl") == "1\n"

View File

@ -1184,6 +1184,72 @@ def test_restore_partition():
)
@pytest.mark.parametrize("exclude_system_log_tables", [False, True])
def test_backup_all(exclude_system_log_tables):
create_and_fill_table()
session_id = new_session_id()
instance.http_query(
"CREATE TEMPORARY TABLE temp_tbl(s String)", params={"session_id": session_id}
)
instance.http_query(
"INSERT INTO temp_tbl VALUES ('q'), ('w'), ('e')",
params={"session_id": session_id},
)
instance.query("CREATE FUNCTION two_and_half AS (x) -> x * 2.5")
instance.query("CREATE USER u1 IDENTIFIED BY 'qwe123' SETTINGS custom_a = 1")
backup_name = new_backup_name()
exclude_from_backup = []
if exclude_system_log_tables:
system_log_tables = (
instance.query(
"SELECT concat('system.', table) FROM system.tables WHERE (database = 'system') AND (table LIKE '%_log')"
)
.rstrip("\n")
.split("\n")
)
exclude_from_backup += system_log_tables
backup_command = f"BACKUP ALL {'EXCEPT TABLES ' + ','.join(exclude_from_backup) if exclude_from_backup else ''} TO {backup_name}"
instance.http_query(backup_command, params={"session_id": session_id})
instance.query("DROP TABLE test.table")
instance.query("DROP FUNCTION two_and_half")
instance.query("DROP USER u1")
restore_settings = []
if not exclude_system_log_tables:
restore_settings.append("allow_non_empty_tables=true")
restore_command = f"RESTORE ALL FROM {backup_name} {'SETTINGS '+ ', '.join(restore_settings) if restore_settings else ''}"
session_id = new_session_id()
instance.http_query(
restore_command, params={"session_id": session_id}, method="POST"
)
assert instance.query("SELECT count(), sum(x) FROM test.table") == "100\t4950\n"
assert instance.http_query(
"SELECT * FROM temp_tbl ORDER BY s", params={"session_id": session_id}
) == TSV([["e"], ["q"], ["w"]])
assert instance.query("SELECT two_and_half(6)") == "15\n"
assert (
instance.query("SHOW CREATE USER u1")
== "CREATE USER u1 IDENTIFIED WITH sha256_password SETTINGS custom_a = 1\n"
)
instance.query("DROP TABLE test.table")
instance.query("DROP FUNCTION two_and_half")
instance.query("DROP USER u1")
def test_operation_id():
create_and_fill_table(n=30)

View File

@ -1,5 +1,7 @@
<yandex>
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,4 +41,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,5 +1,7 @@
<yandex>
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,4 +41,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,5 +1,7 @@
<yandex>
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,4 +41,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
@ -21,4 +21,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
@ -21,4 +21,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,4 +1,4 @@
<yandex>
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
@ -20,4 +20,4 @@
</server>
</raft_configuration>
</keeper_server>
</yandex>
</clickhouse>

View File

@ -1,14 +1,7 @@
import socket
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.test_tools import assert_eq_with_retry
from io import StringIO
import csv
import re
@ -23,7 +16,7 @@ node3 = cluster.add_instance(
"node3", main_configs=["configs/enable_keeper3.xml"], stay_alive=True
)
from kazoo.client import KazooClient, KazooState
from kazoo.client import KazooClient
def wait_nodes():

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>

View File

@ -1,5 +1,7 @@
<clickhouse>
<keeper_server>
<use_cluster>false</use_cluster>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/logs</log_storage_path>

View File

@ -33,3 +33,6 @@
{'aa':4,'bb':5}
{'aa':4,'bb':5}
{'aa':4,'bb':5}
{'aa':('a',4),'bb':('b',5)}
{'aa':('a',4),'bb':('b',5)}
{'aa':('a',4),'bb':('b',5)}

View File

@ -39,3 +39,8 @@ select mapFromArrays(['aa', 'bb'], 5); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT
select mapFromArrays(['aa', 'bb'], [4, 5], [6, 7]); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select mapFromArrays(['aa', 'bb'], [4, 5, 6]); -- { serverError SIZES_OF_ARRAYS_DONT_MATCH }
select mapFromArrays([[1,2], [3,4]], [4, 5, 6]); -- { serverError BAD_ARGUMENTS }
select mapFromArrays(['aa', 'bb'], map('a', 4, 'b', 5));
select mapFromArrays(['aa', 'bb'], materialize(map('a', 4, 'b', 5))) from numbers(2);
select mapFromArrays(map('a', 4, 'b', 4), ['aa', 'bb']) from numbers(2); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
select mapFromArrays(['aa', 'bb'], map('a', 4)); -- { serverError SIZES_OF_ARRAYS_DONT_MATCH }

View File

@ -3,6 +3,9 @@
SET allow_experimental_query_cache = true;
-- (it's silly to use what will be tested below but we have to assume other tests cluttered the query cache)
SYSTEM DROP QUERY CACHE;
-- Cache query result in query cache
SELECT 1 SETTINGS use_query_cache = true;
SELECT count(*) FROM system.query_cache;

View File

@ -31,7 +31,7 @@ ALTER TABLE 02661_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError 36 }
ALTER TABLE 02661_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError BAD_ARGUMENTS }
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT '-----------';
@ -39,4 +39,6 @@ ALTER TABLE 02661_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE
SELECT * FROM 02661_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02661_keepermap_delete_update ON CLUSTER test_shard_localhost UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100; -- { serverError BAD_ARGUMENTS }
DROP TABLE IF EXISTS 02661_keepermap_delete_update;

View File

@ -0,0 +1,8 @@
SELECT
3 0 0
3 0 0
INSERT
CHECK
1
2
6 0 2

View File

@ -0,0 +1,67 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: interserver mode requires SSL
#
# Test that checks that some of ClientInfo correctly passed in inter-server mode.
# NOTE: we need .sh test (.sql is not enough) because queries on remote nodes does not have current_database = currentDatabase()
#
# Check-style suppression: select * from system.query_log where current_database = currentDatabase();
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
function get_query_id() { random_str 10; }
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists buf;
drop table if exists dist;
drop table if exists data;
create table data (key Int) engine=Memory();
create table dist as data engine=Distributed(test_cluster_interserver_secret, currentDatabase(), data, key);
create table dist_dist as data engine=Distributed(test_cluster_interserver_secret, currentDatabase(), dist, key);
system stop distributed sends dist;
"
echo "SELECT"
query_id="$(get_query_id)"
# initialize connection, but actually if there are other tables that uses this
# cluster then, it will be created long time ago, but this is OK for this
# test, since we care about the difference between NOW() and there should
# not be any significant difference.
$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 --query_id "$query_id" -q "select * from dist"
$CLICKHOUSE_CLIENT -nm --param_query_id "$query_id" -q "
system flush logs;
select count(), countIf(initial_query_start_time_microseconds != query_start_time_microseconds), countIf(event_time - initial_query_start_time > 3) from system.query_log where type = 'QueryFinish' and initial_query_id = {query_id:String};
"
sleep 6
query_id="$(get_query_id)"
# this query (and all subsequent) should reuse the previous connection (at least most of the time)
$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 --query_id "$query_id" -q "select * from dist"
$CLICKHOUSE_CLIENT -nm --param_query_id "$query_id" -q "
system flush logs;
select count(), countIf(initial_query_start_time_microseconds != query_start_time_microseconds), countIf(event_time - initial_query_start_time > 3) from system.query_log where type = 'QueryFinish' and initial_query_id = {query_id:String};
"
echo "INSERT"
query_id="$(get_query_id)"
$CLICKHOUSE_CLIENT --prefer_localhost_replica=0 --query_id "$query_id" -nm -q "
insert into dist_dist values (1),(2);
select * from data;
"
sleep 3
$CLICKHOUSE_CLIENT -nm --param_query_id "$query_id" -q "system flush distributed dist_dist"
sleep 1
$CLICKHOUSE_CLIENT -nm --param_query_id "$query_id" -q "system flush distributed dist"
echo "CHECK"
$CLICKHOUSE_CLIENT -nm --param_query_id "$query_id" -q "
select * from data order by key;
system flush logs;
select count(), countIf(initial_query_start_time_microseconds != query_start_time_microseconds), countIf(event_time - initial_query_start_time > 3) from system.query_log where type = 'QueryFinish' and initial_query_id = {query_id:String};
"

View File

@ -0,0 +1 @@
0 0

View File

@ -0,0 +1,16 @@
CREATE TABLE mv_source (a Int64, insert_time DateTime) ENGINE = MergeTree() ORDER BY insert_time;
CREATE TABLE mv_target (a Int64, insert_time DateTime) ENGINE = MergeTree() ORDER BY insert_time;
CREATE MATERIALIZED VIEW source_to_target to mv_target as Select * from mv_source where a not in (Select sleepEachRow(0.1) from numbers(50));
ALTER TABLE mv_source MODIFY TTL insert_time + toIntervalDay(1);
SYSTEM FLUSH LOGS;
-- This is a fancy way to check that the MV hasn't been called (no functions executed by ALTER)
SELECT
ProfileEvents['FunctionExecute'],
ProfileEvents['TableFunctionExecute']
FROM system.query_log
WHERE
type = 'QueryFinish' AND
query like '%ALTER TABLE mv_source%' AND
current_database = currentDatabase() AND
event_time > now() - INTERVAL 10 minute;

View File

@ -0,0 +1,9 @@
-- Tags: no-parallel
SELECT * FROM system.numbers WHERE number > toUInt64(10)(number) LIMIT 10; -- { serverError 309 }
CREATE FUNCTION IF NOT EXISTS sum_udf as (x, y) -> (x + y);
SELECT sum_udf(1)(1, 2); -- { serverError 309 }
DROP FUNCTION IF EXISTS sum_udf;

View File

@ -0,0 +1,3 @@
1 1.1
1 2.1
1 2.1

View File

@ -0,0 +1,20 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02706_keeper_map_insert_strict SYNC;
CREATE TABLE 02706_keeper_map_insert_strict (key UInt64, value Float64) Engine=KeeperMap('/' || currentDatabase() || '/test_02706_keeper_map_insert_strict') PRIMARY KEY(key);
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 1.1), (2, 2.2);
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
SET keeper_map_strict_mode = false;
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 2.1);
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
SET keeper_map_strict_mode = true;
INSERT INTO 02706_keeper_map_insert_strict VALUES (1, 2.1); -- { serverError KEEPER_EXCEPTION }
SELECT * FROM 02706_keeper_map_insert_strict WHERE key = 1;
DROP TABLE 02706_keeper_map_insert_strict;

View File

@ -0,0 +1,10 @@
1 1 -59.952
1 2 59.952
1 3 -100
2 1 -93.7611
2 2 93.7611
3 1 0
3 2 0
---------
0
0

View File

@ -0,0 +1,117 @@
DROP TABLE IF EXISTS srv_account_parts;
DROP TABLE IF EXISTS etl_batch;
CREATE TABLE srv_account_parts(
shard_num UInt16,
account_ids Array(Int64)
)ENGINE = ReplacingMergeTree
ORDER BY shard_num
as select * from values ((0,[]),(1,[1,2,3]),(2,[1,2,3]),(3,[1]));
CREATE TABLE etl_batch(
batch_id UInt64,
batch_start DateTime,
batch_start_day Date DEFAULT toDate(batch_start),
batch_load DateTime,
total_num_records UInt32,
etl_server_id Int32,
account_id UInt64,
shard_num UInt16
)ENGINE = ReplacingMergeTree
PARTITION BY toYYYYMM(batch_start_day)
ORDER BY (batch_id, etl_server_id, account_id);
insert into etl_batch(batch_id, batch_start, batch_load, total_num_records, etl_server_id, account_id, shard_num)
select number batch_id,
toDateTime('2022-01-01') + INTERVAL 23 HOUR batch_start,
batch_start batch_load,
333 total_num_records,
1 etl_server_id,
number%3+1 account_id,
1 shard_num
from numbers(1000);
insert into etl_batch(batch_id, batch_start, batch_load, total_num_records, etl_server_id, account_id, shard_num)
select number+2000 batch_id,
toDateTime('2022-01-01') + INTERVAL 23 HOUR batch_start,
batch_start batch_load,
333 total_num_records,
1 etl_server_id,
number%3+1 account_id,
2 shard_num
from numbers(1000);
insert into etl_batch(batch_id, batch_start, batch_load, total_num_records, etl_server_id, account_id, shard_num)
select number+4000 batch_id,
toDateTime('2022-01-01') + INTERVAL 3 HOUR batch_start,
batch_start batch_load,
3333 total_num_records,
1 etl_server_id,
2 account_id,
2 shard_num
from numbers(1000);
insert into etl_batch(batch_id, batch_start, batch_load, total_num_records, etl_server_id, account_id, shard_num)
select number+6000 batch_id,
toDateTime('2022-01-01') + INTERVAL 23 HOUR batch_start,
batch_start batch_load,
333 total_num_records,
1 etl_server_id,
1 account_id,
2 shard_num
from numbers(1000);
insert into etl_batch(batch_id, batch_start, batch_load, total_num_records, etl_server_id, account_id, shard_num)
select number+8000 batch_id,
toDateTime('2022-01-01') + INTERVAL 23 HOUR batch_start,
batch_start batch_load,
1000 total_num_records,
1 etl_server_id,
3 account_id,
3 shard_num
from numbers(1000);
CREATE OR REPLACE VIEW v_num_records_by_node_bias_acc as
SELECT shard_num,
arrayJoin(account_ids) AS account_id,
records_24h,
records_12h,
IF (b = '',-100,xbias) AS bias,
IF (bias > 10,0,IF (bias > 0,1,IF (bias < -10,301,300))) AS sbias
FROM srv_account_parts
LEFT JOIN (SELECT account_id,
shard_num,
records_24h,
records_12h,
xbias,
'b' AS b
FROM (SELECT account_id,
groupArray((shard_num,records_24h,records_12h)) AS ga,
arraySum(ga.2) AS tot24,
arraySum(ga.3) AS tot12,
arrayMap(i ->(((((i.2)*LENGTH(ga))*100) / tot24) - 100),ga) AS bias24,
arrayMap(i ->(((((i.3)*LENGTH(ga))*100) / tot12) - 100),ga) AS bias12,
arrayMap((i,j,k) ->(i,IF (tot12 = 0,0,IF (ABS(j) > ABS(k),j,k))),ga,bias24,bias12) AS a_bias
FROM (SELECT shard_num,
toInt64(account_id) AS account_id,
SUM(total_num_records) AS records_24h,
sumIf(total_num_records,batch_load >(toDateTime('2022-01-02') -(3600*12))) AS records_12h
FROM etl_batch FINAL PREWHERE (batch_start_day >= (toDate('2022-01-02') - 2)) AND (batch_load > (toDateTime('2022-01-02') - (3600*24)))
where (shard_num, account_id) in (select shard_num, arrayJoin(account_ids) from srv_account_parts)
GROUP BY shard_num,account_id)
GROUP BY account_id)
ARRAY JOIN (a_bias.1).1 AS shard_num,a_bias.2 AS xbias, (a_bias.1).2 AS records_24h, (a_bias.1).3 AS records_12h
) s USING (shard_num,account_id);
select account_id, shard_num, round(bias,4)
from v_num_records_by_node_bias_acc
order by account_id, shard_num, bias;
select '---------';
SELECT a AS b FROM (SELECT 0 a) s LEFT JOIN (SELECT 0 b) t USING (b);
SELECT arrayJoin(a) AS b FROM (SELECT [0] a) s LEFT JOIN (SELECT 0 b) t USING (b);
DROP TABLE srv_account_parts;
DROP TABLE etl_batch;

View File

@ -0,0 +1,32 @@
1 Some string 0
2 Some other string 0
3 random 0
4 random2 0
-----------
3 random 0
4 random2 0
-----------
3 random 0
-----------
0
-----------
1 String 10
2 String 20
3 String 30
4 String 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 10
2 String 20
3 Another 30
4 Another 40
-----------
1 String 102
2 String 202
3 Another 302
4 Another 402
-----------

View File

@ -0,0 +1,44 @@
-- Tags: no-ordinary-database, no-fasttest
DROP TABLE IF EXISTS 02707_keepermap_delete_update;
SET keeper_map_strict_mode = 1;
CREATE TABLE 02707_keepermap_delete_update (key UInt64, value String, value2 UInt64) ENGINE=KeeperMap('/' || currentDatabase() || '/test02707_keepermap_delete_update') PRIMARY KEY(key);
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'Some string', 0), (2, 'Some other string', 0), (3, 'random', 0), (4, 'random2', 0);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE value LIKE 'Some%string';
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update DELETE WHERE key >= 4;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DELETE FROM 02707_keepermap_delete_update WHERE 1 = 1;
SELECT count() FROM 02707_keepermap_delete_update;
SELECT '-----------';
INSERT INTO 02707_keepermap_delete_update VALUES (1, 'String', 10), (2, 'String', 20), (3, 'String', 30), (4, 'String', 40);
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value = 'Another' WHERE key > 2;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE key = key * 10 WHERE 1 = 1; -- { serverError 36 }
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
ALTER TABLE 02707_keepermap_delete_update UPDATE value2 = value2 * 10 + 2 WHERE value2 < 100;
SELECT * FROM 02707_keepermap_delete_update ORDER BY key;
SELECT '-----------';
DROP TABLE IF EXISTS 02707_keepermap_delete_update;

View File

@ -0,0 +1,179 @@
#!/usr/bin/env python3
# A trivial stateless slack bot that notifies about new broken tests in ClickHouse CI.
# It checks what happened to our CI during the last check_period hours (1 hour) and notifies us in slack if necessary.
# This script should be executed once each check_period hours (1 hour).
# It will post duplicate messages if you run it more often; it will lose some messages if you run it less often.
#
# You can run it locally with no arguments, it will work in a dry-run mode. Or you can set your own SLACK_URL_DEFAULT.
# Feel free to add more checks, more details to messages, or better heuristics.
# NOTE There's no deployment automation for now,
# an AWS Lambda (slack-ci-bot-test lambda in CI-CD) has to be updated manually after changing this script.
#
# See also: https://aretestsgreenyet.com/
import os
import json
import base64
if os.environ.get("AWS_LAMBDA_ENV", "0") == "1":
# For AWS labmda (python 3.7)
from botocore.vendored import requests
else:
# For running locally
import requests
DRY_RUN_MARK = "<no url, dry run>"
MAX_FAILURES_DEFAULT = 50
SLACK_URL_DEFAULT = DRY_RUN_MARK
# Find tests that failed in master during the last check_period hours,
# but did not fail during the last 2 weeks. Assuming these tests were broken recently.
# NOTE: It may report flaky tests that fail too rarely.
NEW_BROKEN_TESTS_QUERY = """
WITH
1 AS check_period,
now() as now
SELECT test_name, any(report_url)
FROM checks
WHERE 1
AND check_start_time >= now - INTERVAL 1 WEEK
AND (check_start_time + check_duration_ms / 1000) >= now - INTERVAL check_period HOUR
AND pull_request_number = 0
AND test_status LIKE 'F%'
AND check_status != 'success'
AND test_name NOT IN (
SELECT test_name FROM checks WHERE 1
AND check_start_time >= now - INTERVAL 1 MONTH
AND (check_start_time + check_duration_ms / 1000) BETWEEN now - INTERVAL 2 WEEK AND now - INTERVAL check_period HOUR
AND pull_request_number = 0
AND check_status != 'success'
AND test_status LIKE 'F%')
AND test_context_raw NOT LIKE '%CannotSendRequest%' and test_context_raw NOT LIKE '%Server does not respond to health check%'
GROUP BY test_name
"""
# Returns total number of failed checks during the last 24 hours
# and previous value of that metric (check_period hours ago)
COUNT_FAILURES_QUERY = """
WITH
1 AS check_period,
'%' AS check_name_pattern,
now() as now
SELECT
countIf((check_start_time + check_duration_ms / 1000) >= now - INTERVAL 24 HOUR) AS new_val,
countIf((check_start_time + check_duration_ms / 1000) <= now - INTERVAL check_period HOUR) AS prev_val
FROM checks
WHERE 1
AND check_start_time >= now - INTERVAL 1 WEEK
AND (check_start_time + check_duration_ms / 1000) >= now - INTERVAL 24 + check_period HOUR
AND pull_request_number = 0
AND test_status LIKE 'F%'
AND check_status != 'success'
AND check_name ILIKE check_name_pattern
"""
SLACK_MESSAGE_JSON = {"type": "mrkdwn", "text": None}
def get_play_url(query):
return (
"https://play.clickhouse.com/play?user=play#"
+ base64.b64encode(query.encode()).decode()
)
def run_clickhouse_query(query):
url = "https://play.clickhouse.com/?user=play&query=" + requests.utils.quote(query)
res = requests.get(url)
if res.status_code != 200:
print("Failed to execute query: ", res.status_code, res.content)
raise Exception(
"Failed to execute query: {}: {}".format(res.status_code, res.content)
)
lines = res.text.strip().splitlines()
return [x.split("\t") for x in lines]
def get_new_broken_tests_message(broken_tests):
if not broken_tests:
return None
msg = "There are {} new broken tests in master:\n".format(len(broken_tests))
for name, report in broken_tests:
msg += " - *{}* - <{}|Report>\n".format(name, report)
return msg
def get_too_many_failures_message(failures_count):
MAX_FAILURES = int(os.environ.get("MAX_FAILURES", MAX_FAILURES_DEFAULT))
curr_failures = int(failures_count[0][0])
prev_failures = int(failures_count[0][1])
if curr_failures == 0:
return (
"Looks like CI is completely broken: there are *no failures* at all... 0_o"
)
if curr_failures < MAX_FAILURES:
return None
if prev_failures < MAX_FAILURES:
return "*CI is broken: there are {} failures during the last 24 hours*".format(
curr_failures
)
if curr_failures < prev_failures:
return None
if (curr_failures - prev_failures) / prev_failures < 0.2:
return None
return "CI is broken and it's getting worse: there are {} failures during the last 24 hours".format(
curr_failures
)
def send_to_slack(message):
SLACK_URL = os.environ.get("SLACK_URL", SLACK_URL_DEFAULT)
if SLACK_URL == DRY_RUN_MARK:
return
payload = SLACK_MESSAGE_JSON.copy()
payload["text"] = message
res = requests.post(SLACK_URL, json.dumps(payload))
if res.status_code != 200:
print("Failed to send a message to Slack: ", res.status_code, res.content)
raise Exception(
"Failed to send a message to Slack: {}: {}".format(
res.status_code, res.content
)
)
def query_and_alert_if_needed(query, get_message_func):
query_res = run_clickhouse_query(query)
print("Got result {} for query {}", query_res, query)
msg = get_message_func(query_res)
if msg is None:
return
msg += "\nCI DB query: <{}|link>".format(get_play_url(query))
print("Sending message to slack:", msg)
send_to_slack(msg)
def check_and_alert():
query_and_alert_if_needed(NEW_BROKEN_TESTS_QUERY, get_new_broken_tests_message)
query_and_alert_if_needed(COUNT_FAILURES_QUERY, get_too_many_failures_message)
def lambda_handler(event, context):
try:
check_and_alert()
return {"statusCode": 200, "body": "OK"}
except Exception as e:
send_to_slack(
"I failed, please help me (see ClickHouse/utils/ci-slack-bot/ci-slack-bot.py): "
+ str(e)
)
return {"statusCode": 200, "body": "FAIL"}
if __name__ == "__main__":
check_and_alert()

View File

@ -36,6 +36,7 @@ v22.9.4.32-stable 2022-10-26
v22.9.3.18-stable 2022-09-30
v22.9.2.7-stable 2022-09-23
v22.9.1.2603-stable 2022-09-22
v22.8.16.32-lts 2023-04-04
v22.8.15.23-lts 2023-03-10
v22.8.14.53-lts 2023-02-27
v22.8.13.20-lts 2023-01-29

1 v23.3.1.2823-lts 2023-03-31
36 v22.9.3.18-stable 2022-09-30
37 v22.9.2.7-stable 2022-09-23
38 v22.9.1.2603-stable 2022-09-22
39 v22.8.16.32-lts 2023-04-04
40 v22.8.15.23-lts 2023-03-10
41 v22.8.14.53-lts 2023-02-27
42 v22.8.13.20-lts 2023-01-29