Merge branch 'master' into client-allow-yaml

This commit is contained in:
Alexey Milovidov 2023-08-03 20:22:41 +03:00 committed by GitHub
commit dae2e9d92d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
115 changed files with 1196 additions and 699 deletions

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -200,8 +200,8 @@ Templates:
- [Server Setting](_description_templates/template-server-setting.md)
- [Database or Table engine](_description_templates/template-engine.md)
- [System table](_description_templates/template-system-table.md)
- [Data type](_description_templates/data-type.md)
- [Statement](_description_templates/statement.md)
- [Data type](_description_templates/template-data-type.md)
- [Statement](_description_templates/template-statement.md)
<a name="how-to-build-docs"/>

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.7.2.25-stable (8dd1107b032) FIXME as compared to v23.7.1.2470-stable (a70127baecc)
#### Backward Incompatible Change
* Backported in [#52850](https://github.com/ClickHouse/ClickHouse/issues/52850): If a dynamic disk contains a name, it should be specified as `disk = disk(name = 'disk_name'`, ...) in disk function arguments. In previous version it could be specified as `disk = disk_<disk_name>(...)`, which is no longer supported. [#52820](https://github.com/ClickHouse/ClickHouse/pull/52820) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### Build/Testing/Packaging Improvement
* Backported in [#52913](https://github.com/ClickHouse/ClickHouse/issues/52913): Add `clickhouse-keeper-client` symlink to the clickhouse-server package. [#51882](https://github.com/ClickHouse/ClickHouse/pull/51882) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix binary arithmetic for Nullable(IPv4) [#51642](https://github.com/ClickHouse/ClickHouse/pull/51642) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Support IPv4 and IPv6 as dictionary attributes [#51756](https://github.com/ClickHouse/ClickHouse/pull/51756) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* init and destroy ares channel on demand.. [#52634](https://github.com/ClickHouse/ClickHouse/pull/52634) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix crash in function `tuple` with one sparse column argument [#52659](https://github.com/ClickHouse/ClickHouse/pull/52659) ([Anton Popov](https://github.com/CurtizJ)).
* Fix data race in Keeper reconfiguration [#52804](https://github.com/ClickHouse/ClickHouse/pull/52804) ([Antonio Andelic](https://github.com/antonio2368)).
* clickhouse-keeper: fix implementation of server with poll() [#52833](https://github.com/ClickHouse/ClickHouse/pull/52833) ([Andy Fiddaman](https://github.com/citrus-it)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Rename setting disable_url_encoding to enable_url_encoding and add a test [#52656](https://github.com/ClickHouse/ClickHouse/pull/52656) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix bugs and better test for SYSTEM STOP LISTEN [#52680](https://github.com/ClickHouse/ClickHouse/pull/52680) ([Nikolay Degterinsky](https://github.com/evillique)).
* Increase min protocol version for sparse serialization [#52835](https://github.com/ClickHouse/ClickHouse/pull/52835) ([Anton Popov](https://github.com/CurtizJ)).
* Docker improvements [#52869](https://github.com/ClickHouse/ClickHouse/pull/52869) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -141,6 +141,10 @@ Runs [stateful functional tests](tests.md#functional-tests). Treat them in the s
Runs [integration tests](tests.md#integration-tests).
## Bugfix validate check
Checks that either a new test (functional or integration) or there some changed tests that fail with the binary built on master branch. This check is triggered when pull request has "pr-bugfix" label.
## Stress Test
Runs stateless functional tests concurrently from several clients to detect
concurrency-related errors. If it fails:

View File

@ -22,7 +22,7 @@ CREATE TABLE deltalake
- `url` — Bucket url with path to the existing Delta Lake table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -22,7 +22,7 @@ CREATE TABLE hudi_table
- `url` — Bucket url with the path to an existing Hudi table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -51,7 +51,3 @@ keeper foo bar
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stable_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)

View File

@ -36,6 +36,8 @@ These `ALTER` statements modify entities related to role-based access control:
[ALTER TABLE ... MODIFY COMMENT](/docs/en/sql-reference/statements/alter/comment.md) statement adds, modifies, or removes comments to the table, regardless if it was set before or not.
[ALTER NAMED COLLECTION](/docs/en/sql-reference/statements/alter/named-collection.md) statement modifies [Named Collections](/docs/en/operations/named-collections.md).
## Mutations
`ALTER` queries that are intended to manipulate table data are implemented with a mechanism called “mutations”, most notably [ALTER TABLE … DELETE](/docs/en/sql-reference/statements/alter/delete.md) and [ALTER TABLE … UPDATE](/docs/en/sql-reference/statements/alter/update.md). They are asynchronous background processes similar to merges in [MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables that to produce new “mutated” versions of parts.

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/statements/alter/named-collection
sidebar_label: NAMED COLLECTION
---
# ALTER NAMED COLLECTION
This query intends to modify already existing named collections.
**Syntax**
```sql
ALTER NAMED COLLECTION [IF EXISTS] name [ON CLUSTER cluster]
[ SET
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
... ] |
[ DELETE key_name4, key_name5, ... ]
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
ALTER NAMED COLLECTION foobar SET a = '2', c = '3';
ALTER NAMED COLLECTION foobar DELETE b;
```

View File

@ -8,13 +8,14 @@ sidebar_label: CREATE
Create queries make a new entity of one of the following kinds:
- [DATABASE](../../../sql-reference/statements/create/database.md)
- [TABLE](../../../sql-reference/statements/create/table.md)
- [VIEW](../../../sql-reference/statements/create/view.md)
- [DICTIONARY](../../../sql-reference/statements/create/dictionary.md)
- [FUNCTION](../../../sql-reference/statements/create/function.md)
- [USER](../../../sql-reference/statements/create/user.md)
- [ROLE](../../../sql-reference/statements/create/role.md)
- [ROW POLICY](../../../sql-reference/statements/create/row-policy.md)
- [QUOTA](../../../sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](../../../sql-reference/statements/create/settings-profile.md)
- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
- [TABLE](/docs/en/sql-reference/statements/create/table.md)
- [VIEW](/docs/en/sql-reference/statements/create/view.md)
- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
- [USER](/docs/en/sql-reference/statements/create/user.md)
- [ROLE](/docs/en/sql-reference/statements/create/role.md)
- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)

View File

@ -0,0 +1,34 @@
---
slug: /en/sql-reference/statements/create/named-collection
sidebar_label: NAMED COLLECTION
---
# CREATE NAMED COLLECTION
Creates a new named collection.
**Syntax**
```sql
CREATE NAMED COLLECTION [IF NOT EXISTS] name [ON CLUSTER cluster] AS
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
...
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
```
**Related statements**
- [CREATE NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/alter/named-collection)
- [DROP NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/drop#drop-function)
**See Also**
- [Named collections guide](/docs/en/operations/named-collections.md)

View File

@ -119,3 +119,20 @@ DROP FUNCTION [IF EXISTS] function_name [on CLUSTER cluster]
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
DROP FUNCTION linear_equation;
```
## DROP NAMED COLLECTION
Deletes a named collection.
**Syntax**
``` sql
DROP NAMED COLLECTION [IF EXISTS] name [on CLUSTER cluster]
```
**Example**
``` sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
DROP NAMED COLLECTION foobar;
```

View File

@ -314,6 +314,22 @@ Provides possibility to start background fetch tasks from replication queues whi
SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### STOP PULLING REPLICATION LOG
Stops loading new entries from replication log to replication queue in a `ReplicatedMergeTree` table.
``` sql
SYSTEM STOP PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### START PULLING REPLICATION LOG
Cancels `SYSTEM STOP PULLING REPLICATION LOG`.
``` sql
SYSTEM START PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### SYNC REPLICA
Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster, but no more than `receive_timeout` seconds.

View File

@ -21,7 +21,7 @@ iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure])
- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
Engine parameters can be specified using [Named Collections](../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Returned value**

View File

@ -0,0 +1 @@
../../../en/operations/optimizing-performance/profile-guided-optimization.md

View File

@ -0,0 +1 @@
../../../en/operations/optimizing-performance/profile-guided-optimization.md

View File

@ -1,6 +1,5 @@
#include "Commands.h"
#include <queue>
#include "KeeperClient.h"
@ -25,18 +24,8 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
else
path = client->cwd;
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
bool need_space = false;
for (const auto & child : children)
{
if (std::exchange(need_space, true))
std::cout << " ";
std::cout << child;
}
for (const auto & child : client->zookeeper->getChildren(path))
std::cout << child << " ";
std::cout << "\n";
}
@ -141,173 +130,6 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void GetStatCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
Coordination::Stat stat;
String path;
if (!query->args.empty())
path = client->getAbsolutePath(query->args[0].safeGet<String>());
else
path = client->cwd;
client->zookeeper->get(path, &stat);
std::cout << "cZxid = " << stat.czxid << "\n";
std::cout << "mZxid = " << stat.mzxid << "\n";
std::cout << "pZxid = " << stat.pzxid << "\n";
std::cout << "ctime = " << stat.ctime << "\n";
std::cout << "mtime = " << stat.mtime << "\n";
std::cout << "version = " << stat.version << "\n";
std::cout << "cversion = " << stat.cversion << "\n";
std::cout << "aversion = " << stat.aversion << "\n";
std::cout << "ephemeralOwner = " << stat.ephemeralOwner << "\n";
std::cout << "dataLength = " << stat.dataLength << "\n";
std::cout << "numChildren = " << stat.numChildren << "\n";
}
bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
ASTPtr threshold;
if (!ParserUnsignedInteger{}.parse(pos, threshold, expected))
return false;
node->args.push_back(threshold->as<ASTLiteral &>().value);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
return true;
}
void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto threshold = query->args[0].safeGet<UInt64>();
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
}
bool DeleteStableBackups::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
}
void DeleteStableBackups::execute(const ASTKeeperQuery * /* query */, KeeperClient * client) const
{
client->askConfirmation(
"You are going to delete all inactive backups in /clickhouse/backups.",
[client]
{
fs::path backup_root = "/clickhouse/backups";
auto backups = client->zookeeper->getChildren(backup_root);
std::sort(backups.begin(), backups.end());
for (const auto & child : backups)
{
auto backup_path = backup_root / child;
std::cout << "Found backup " << backup_path << ", checking if it's active\n";
String stage_path = backup_path / "stage";
auto stages = client->zookeeper->getChildren(stage_path);
bool is_active = false;
for (const auto & stage : stages)
{
if (startsWith(stage, "alive"))
{
is_active = true;
break;
}
}
if (is_active)
{
std::cout << "Backup " << backup_path << " is active, not going to delete\n";
continue;
}
std::cout << "Backup " << backup_path << " is not active, deleting it\n";
client->zookeeper->removeRecursive(backup_path);
}
});
}
bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
ASTPtr count;
if (ParserUnsignedInteger{}.parse(pos, count, expected))
node->args.push_back(count->as<ASTLiteral &>().value);
else
node->args.push_back(UInt64(10));
return true;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
std::transform(children.cbegin(), children.cend(), children.begin(), [&](const String & child) { return next_path / child; });
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)
std::cout << std::get<1>(result[i]) << "\t" << std::get<0>(result[i]) << "\n";
}
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
@ -348,7 +170,7 @@ bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery
void HelpCommand::execute(const ASTKeeperQuery * /* query */, KeeperClient * /* client */) const
{
for (const auto & pair : KeeperClient::commands)
std::cout << pair.second->generateHelpString() << "\n";
std::cout << pair.second->getHelpMessage() << "\n";
}
bool FourLetterWordCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const

View File

@ -21,12 +21,6 @@ public:
virtual String getName() const = 0;
virtual ~IKeeperClientCommand() = default;
String generateHelpString() const
{
return fmt::vformat(getHelpMessage(), fmt::make_format_args(getName()));
}
};
using Command = std::shared_ptr<IKeeperClientCommand>;
@ -40,7 +34,7 @@ class LSCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Lists the nodes for the given path (default: cwd)"; }
String getHelpMessage() const override { return "ls [path] -- Lists the nodes for the given path (default: cwd)"; }
};
class CDCommand : public IKeeperClientCommand
@ -51,7 +45,7 @@ class CDCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Change the working path (default `.`)"; }
String getHelpMessage() const override { return "cd [path] -- Change the working path (default `.`)"; }
};
class SetCommand : public IKeeperClientCommand
@ -64,7 +58,7 @@ class SetCommand : public IKeeperClientCommand
String getHelpMessage() const override
{
return "{} <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
return "set <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
}
};
@ -76,7 +70,7 @@ class CreateCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> <value> -- Creates new node"; }
String getHelpMessage() const override { return "create <path> <value> -- Creates new node"; }
};
class GetCommand : public IKeeperClientCommand
@ -87,63 +81,9 @@ class GetCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
String getHelpMessage() const override { return "get <path> -- Returns the node's value"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Returns the node's stat (default `.`)"; }
};
class FindSuperNodes : public IKeeperClientCommand
{
String getName() const override { return "find_super_nodes"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} <threshold> [path] -- Finds nodes with number of children larger than some threshold for the given path (default `.`)";
}
};
class DeleteStableBackups : public IKeeperClientCommand
{
String getName() const override { return "delete_stable_backups"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} -- Deletes ClickHouse nodes used for backups that are now inactive";
}
};
class FindBigFamily : public IKeeperClientCommand
{
String getName() const override { return "find_big_family"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} [path] [n] -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)";
}
};
class RMCommand : public IKeeperClientCommand
{
String getName() const override { return "rm"; }
@ -152,7 +92,7 @@ class RMCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Remove the node"; }
String getHelpMessage() const override { return "remove <path> -- Remove the node"; }
};
class RMRCommand : public IKeeperClientCommand
@ -163,7 +103,7 @@ class RMRCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
String getHelpMessage() const override { return "rmr <path> -- Recursively deletes path. Confirmation required"; }
};
class HelpCommand : public IKeeperClientCommand
@ -174,7 +114,7 @@ class HelpCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} -- Prints this message"; }
String getHelpMessage() const override { return "help -- Prints this message"; }
};
class FourLetterWordCommand : public IKeeperClientCommand
@ -185,7 +125,7 @@ class FourLetterWordCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <command> -- Executes four-letter-word command"; }
String getHelpMessage() const override { return "flwc <command> -- Executes four-letter-word command"; }
};
}

View File

@ -177,10 +177,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStableBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<HelpCommand>(),

View File

@ -58,7 +58,6 @@ bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
String command_name(pos->begin, pos->end);
std::transform(command_name.begin(), command_name.end(), command_name.begin(), [](unsigned char c) { return std::tolower(c); });
Command command;
auto iter = KeeperClient::commands.find(command_name);

View File

@ -1650,6 +1650,7 @@ try
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context);
startupSystemTables();
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1668,7 +1669,6 @@ try
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -168,6 +168,7 @@ enum class AccessType
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \

View File

@ -51,7 +51,7 @@ TEST(AccessRights, Union)
"CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, "
"TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, "
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -8,6 +8,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
@ -225,24 +226,15 @@ public:
void remove(const std::string & collection_name)
{
if (!removeIfExists(collection_name))
auto collection_path = getMetadataPath(collection_name);
if (!fs::exists(collection_path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
collection_name);
}
}
bool removeIfExists(const std::string & collection_name)
{
auto collection_path = getMetadataPath(collection_name);
if (fs::exists(collection_path))
{
fs::remove(collection_path);
return true;
}
return false;
fs::remove(collection_path);
}
private:
@ -393,36 +385,64 @@ void loadIfNot()
return loadIfNotUnlocked(lock);
}
void removeFromSQL(const std::string & collection_name, ContextPtr context)
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).remove(collection_name);
NamedCollectionFactory::instance().remove(collection_name);
}
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).removeIfExists(collection_name);
NamedCollectionFactory::instance().removeIfExists(collection_name);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).remove(query.collection_name);
instance.remove(query.collection_name);
}
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
NamedCollectionFactory::instance().add(query.collection_name, LoadFromSQL(context).create(query));
auto & instance = NamedCollectionFactory::instance();
if (instance.exists(query.collection_name))
{
if (!query.if_not_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"A named collection `{}` already exists",
query.collection_name);
}
return;
}
instance.add(query.collection_name, LoadFromSQL(context).create(query));
}
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).update(query);
auto collection = NamedCollectionFactory::instance().getMutable(query.collection_name);
auto collection = instance.getMutable(query.collection_name);
auto collection_lock = collection->lock();
for (const auto & [name, value] : query.changes)

View File

@ -8,6 +8,7 @@ namespace DB
class ASTCreateNamedCollectionQuery;
class ASTAlterNamedCollectionQuery;
class ASTDropNamedCollectionQuery;
namespace NamedCollectionUtils
{
@ -26,8 +27,7 @@ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
void loadFromSQL(ContextPtr context);
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
void removeFromSQL(const std::string & collection_name, ContextPtr context);
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context);
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);

View File

@ -101,6 +101,10 @@ void ProgressIndication::writeFinalProgress()
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.)";
else
std::cout << ". ";
auto peak_memory_usage = getMemoryUsage().peak;
if (peak_memory_usage >= 0)
std::cout << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
}
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)

View File

@ -218,7 +218,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});

View File

@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
{
cckMetadataPathForOrdinary(create, metadata_path);
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)

View File

@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
throw;
}
fs::create_directories(metadata_path);
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
}

View File

@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
, cache_tables(cache_tables_)
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
{
fs::create_directories(metadata_path);
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate();
}

View File

@ -74,19 +74,22 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.file_segment_range = { range.left, range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_size = file_segment_range.size(),
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -495,7 +498,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
appendFilesystemCacheLog(*current_file_segment, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -518,7 +521,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
appendFilesystemCacheLog(file_segments->front(), read_type);
}
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -109,6 +109,8 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -510,11 +510,12 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
map.clear();
bool all_has_nullable = all_nullable;
bool current_has_nullable = false;
for (size_t arg_num = 0; arg_num < args; ++arg_num)
{
const auto & arg = arrays.args[arg_num];
bool current_has_nullable = false;
current_has_nullable = false;
size_t off;
// const array has only one row
@ -549,44 +550,93 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
}
}
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
// We update offsets for all the arrays except the first one. Offsets for the first array would be updated later.
// It is needed to iterate the first array again so that the elements in the result would have fixed order.
if (arg_num)
{
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
}
if (!current_has_nullable)
all_has_nullable = false;
}
if (all_has_nullable)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
}
// We have NULL in output only once if it should be there
bool null_added = false;
const auto & arg = arrays.args[0];
size_t off;
// const array has only one row
if (arg.is_const)
off = (*arg.offsets)[0];
else
off = (*arg.offsets)[row];
for (const auto & pair : map)
for (auto i : collections::range(prev_off[0], off))
{
if (pair.getMapped() == args)
all_has_nullable = all_nullable;
typename Map::LookupResult pair = nullptr;
if (arg.null_map && (*arg.null_map)[i])
{
current_has_nullable = true;
if (all_has_nullable && !null_added)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
null_added = true;
}
if (null_added)
continue;
}
else if constexpr (is_numeric_column)
{
pair = map.find(columns[0]->getElement(i));
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
pair = map.find(columns[0]->getDataAt(i));
else
{
const char * data = nullptr;
pair = map.find(columns[0]->serializeValueIntoArena(i, arena, data));
}
prev_off[0] = off;
if (arg.is_const)
prev_off[0] = 0;
if (!current_has_nullable)
all_has_nullable = false;
if (pair && pair->getMapped() == args)
{
// We increase pair->getMapped() here to not skip duplicate values from the first array.
++pair->getMapped();
++result_offset;
if constexpr (is_numeric_column)
result_data.insertValue(pair.getKey());
{
result_data.insertValue(pair->getKey());
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
result_data.insertData(pair.getKey().data, pair.getKey().size);
{
result_data.insertData(pair->getKey().data, pair->getKey().size);
}
else
result_data.deserializeAndInsertFromArena(pair.getKey().data);
{
result_data.deserializeAndInsertFromArena(pair->getKey().data);
}
if (all_nullable)
null_map.push_back(0);
}
}
result_offsets.getElement(row) = result_offset;
}
}
ColumnPtr result_column = std::move(result_data_ptr);
if (all_nullable)
result_column = ColumnNullable::create(result_column, std::move(null_map_column));
return ColumnArray::create(result_column, std::move(result_offsets_ptr));
}

View File

@ -16,6 +16,7 @@ namespace ActionLocks
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
extern const StorageActionBlockType PartsMove = 7;
extern const StorageActionBlockType PullReplicationLog = 8;
}

View File

@ -806,6 +806,13 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
return true;
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeAll();
}
void FileCache::removeKeyIfExists(const Key & key)
{
assertInitialized();
@ -818,7 +825,14 @@ void FileCache::removeKeyIfExists(const Key & key)
/// But if we have multiple replicated zero-copy tables on the same server
/// it became possible to start removing something from cache when it is used
/// by other "zero-copy" tables. That is why it's not an error.
locked_key->removeAllReleasable();
locked_key->removeAll(/* if_releasable */true);
}
void FileCache::removeFileSegment(const Key & key, size_t offset)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeFileSegment(offset);
}
void FileCache::removePathIfExists(const String & path)
@ -830,22 +844,12 @@ void FileCache::removeAllReleasable()
{
assertInitialized();
auto lock = lockCache();
main_priority->iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
if (segment_metadata->releasable())
{
auto file_segment = segment_metadata->file_segment;
locked_key.removeFileSegment(file_segment->offset(), file_segment->lock());
return PriorityIterationResult::REMOVE_AND_CONTINUE;
}
return PriorityIterationResult::CONTINUE;
}, lock);
metadata.iterate([](LockedKey & locked_key) { locked_key.removeAll(/* if_releasable */true); });
if (stash)
{
/// Remove all access information.
auto lock = lockCache();
stash->records.clear();
stash->queue->removeAll(lock);
}
@ -914,7 +918,7 @@ void FileCache::loadMetadata()
continue;
}
const auto key = Key(unhexUInt<UInt128>(key_directory.filename().string().data()));
const auto key = Key::fromKeyString(key_directory.filename().string());
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true);
for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it)
@ -1069,7 +1073,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
{
FileSegments file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata())
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));

View File

@ -83,13 +83,19 @@ public:
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
/// Remove files by `key`. Removes files which might be used at the moment.
/// Remove file segment by `key` and `offset`. Throws if file segment does not exist.
void removeFileSegment(const Key & key, size_t offset);
/// Remove files by `key`. Throws if key does not exist.
void removeKey(const Key & key);
/// Remove files by `key`.
void removeKeyIfExists(const Key & key);
/// Removes files by `path`. Removes files which might be used at the moment.
/// Removes files by `path`.
void removePathIfExists(const String & path);
/// Remove files by `key`. Will not remove files which are used at the moment.
/// Remove files by `key`.
void removeAllReleasable();
std::vector<String> tryGetCachePaths(const Key & key);

View File

@ -7,6 +7,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
FileCacheKey::FileCacheKey(const std::string & path)
: key(sipHash128(path.data(), path.size()))
@ -28,4 +32,11 @@ FileCacheKey FileCacheKey::random()
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
}
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
{
if (key_str.size() != 32)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid cache key hex: {}", key_str);
return FileCacheKey(unhexUInt<UInt128>(key_str.data()));
}
}

View File

@ -21,6 +21,8 @@ struct FileCacheKey
static FileCacheKey random();
bool operator==(const FileCacheKey & other) const { return key == other.key; }
static FileCacheKey fromKeyString(const std::string & key_str);
};
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;

View File

@ -25,6 +25,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
FileSegmentMetadata::FileSegmentMetadata(FileSegmentPtr && file_segment_)
@ -191,6 +192,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
if (it == end())
{
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
return nullptr;
@ -215,6 +218,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
return locked_metadata;
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
@ -561,11 +566,11 @@ bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const
return file_segment_metadata->file_segment.use_count() == 2;
}
void LockedKey::removeAllReleasable()
void LockedKey::removeAll(bool if_releasable)
{
for (auto it = key_metadata->begin(); it != key_metadata->end();)
{
if (!it->second->releasable())
if (if_releasable && !it->second->releasable())
{
++it;
continue;
@ -586,17 +591,32 @@ void LockedKey::removeAllReleasable()
}
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset);
auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock());
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
return removeFileSegmentImpl(it, segment_lock);
}
KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock)
{
auto file_segment = it->second->file_segment;
LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), offset, file_segment->reserved_size);
getKey(), file_segment->offset(), file_segment->reserved_size);
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));

View File

@ -87,7 +87,7 @@ struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>,
{
public:
using Key = FileCacheKey;
using IterateCacheMetadataFunc = std::function<void(const LockedKey &)>;
using IterateCacheMetadataFunc = std::function<void(LockedKey &)>;
explicit CacheMetadata(const std::string & path_);
@ -106,6 +106,7 @@ public:
enum class KeyNotFoundPolicy
{
THROW,
THROW_LOGICAL,
CREATE_EMPTY,
RETURN_NULL,
};
@ -169,9 +170,10 @@ struct LockedKey : private boost::noncopyable
std::shared_ptr<const KeyMetadata> getKeyMetadata() const { return key_metadata; }
std::shared_ptr<KeyMetadata> getKeyMetadata() { return key_metadata; }
void removeAllReleasable();
void removeAll(bool if_releasable = true);
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset);
void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);
@ -188,6 +190,8 @@ struct LockedKey : private boost::noncopyable
std::string toString() const;
private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &);
const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
};

View File

@ -40,6 +40,8 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(types)},
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"key", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>()},
@ -60,6 +62,8 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_key);
columns[i++]->insert(file_segment_offset);
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(cache_type));
columns[i++]->insert(read_from_cache_attempted);

View File

@ -11,16 +11,7 @@
namespace DB
{
///
/// -------- Column --------- Type ------
/// | event_date | DateTime |
/// | event_time | UInt64 |
/// | query_id | String |
/// | remote_file_path | String |
/// | segment_range | Tuple |
/// | read_type | String |
/// -------------------------------------
///
struct FilesystemCacheLogElement
{
enum class CacheType
@ -39,6 +30,8 @@ struct FilesystemCacheLogElement
std::pair<size_t, size_t> file_segment_range{};
std::pair<size_t, size_t> requested_range{};
CacheType cache_type{};
std::string file_segment_key;
size_t file_segment_offset;
size_t file_segment_size;
bool read_from_cache_attempted;
String read_buffer_id;

View File

@ -1,5 +1,4 @@
#include <Interpreters/InterpreterCreateNamedCollectionQuery.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>

View File

@ -22,11 +22,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
if (query.if_exists)
NamedCollectionUtils::removeIfExistsFromSQL(query.collection_name, current_context);
else
NamedCollectionUtils::removeFromSQL(query.collection_name, current_context);
NamedCollectionUtils::removeFromSQL(query, current_context);
return {};
}

View File

@ -89,13 +89,14 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
extern StorageActionBlockType PartsMove;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
}
@ -155,6 +156,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
return AccessType::SYSTEM_TTL_MERGES;
else if (action_type == ActionLocks::PartsMove)
return AccessType::SYSTEM_MOVES;
else if (action_type == ActionLocks::PullReplicationLog)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
}
@ -371,7 +374,18 @@ BlockIO InterpreterSystemQuery::execute()
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
cache->removeAllReleasable();
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
}
else
{
auto key = FileCacheKey::fromKeyString(query.key_to_drop);
if (query.offset_to_drop.has_value())
cache->removeFileSegment(key, query.offset_to_drop.value());
else
cache->removeKey(key);
}
}
break;
}
@ -502,6 +516,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_DISTRIBUTED_SENDS:
startStopAction(ActionLocks::DistributedSend, true);
break;
case Type::STOP_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, false);
break;
case Type::START_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, true);
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
@ -1079,6 +1099,15 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG);
else
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_FETCHES:
case Type::START_FETCHES:
{

View File

@ -91,34 +91,30 @@ void WindowFrame::toString(WriteBuffer & buf) const
void WindowFrame::checkValid() const
{
// Check the validity of offsets.
if (type == WindowFrame::FrameType::ROWS
|| type == WindowFrame::FrameType::GROUPS)
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
// Check relative positioning of offsets.

View File

@ -45,10 +45,10 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType DistributedSend;
}
static void executeCreateQuery(
@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.

View File

@ -15,6 +15,8 @@ ASTPtr ASTAlterNamedCollectionQuery::clone() const
void ASTAlterNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "Alter NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
if (!changes.empty())

View File

@ -18,6 +18,8 @@ ASTPtr ASTCreateNamedCollectionQuery::clone() const
void ASTCreateNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE NAMED COLLECTION ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);

View File

@ -13,6 +13,7 @@ class ASTCreateNamedCollectionQuery : public IAST, public ASTQueryWithOnCluster
public:
std::string collection_name;
SettingsChanges changes;
bool if_not_exists = false;
String getID(char) const override { return "CreateNamedCollectionQuery"; }

View File

@ -13,6 +13,8 @@ ASTPtr ASTDropNamedCollectionQuery::clone() const
void ASTDropNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}

View File

@ -162,7 +162,9 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
|| type == Type::START_DISTRIBUTED_SENDS
|| type == Type::STOP_PULLING_REPLICATION_LOG
|| type == Type::START_PULLING_REPLICATION_LOG)
{
if (table)
print_database_table();
@ -210,7 +212,15 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_name.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name;
if (!key_to_drop.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << key_to_drop;
if (offset_to_drop.has_value())
settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << offset_to_drop.value();
}
}
}
else if (type == Type::UNFREEZE)
{

View File

@ -80,6 +80,8 @@ public:
UNFREEZE,
ENABLE_FAILPOINT,
DISABLE_FAILPOINT,
STOP_PULLING_REPLICATION_LOG,
START_PULLING_REPLICATION_LOG,
END
};
@ -108,6 +110,8 @@ public:
UInt64 seconds{};
String filesystem_cache_name;
std::string key_to_drop;
std::optional<size_t> offset_to_drop;
String backup_name;

View File

@ -13,8 +13,9 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
{
ParserKeyword s_alter("ALTER");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_delete("DELETE");
ParserIdentifier name_p;
ParserSetQuery set_p;
ParserToken s_comma(TokenType::Comma);
@ -32,10 +33,13 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
if (!s_collection.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -1421,15 +1421,17 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create("CREATE");
ParserKeyword s_attach("ATTACH");
ParserKeyword s_named_collection("NAMED COLLECTION");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_as("AS");
ParserToken s_comma(TokenType::Comma);
ParserIdentifier name_p;
ParserToken s_comma(TokenType::Comma);
String cluster_str;
bool if_not_exists = false;
ASTPtr collection_name;
String cluster_str;
if (!s_create.ignore(pos, expected))
return false;
@ -1437,10 +1439,13 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
if (!s_named_collection.ignore(pos, expected))
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
@ -1465,7 +1470,9 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
auto query = std::make_shared<ASTCreateNamedCollectionQuery>();
tryGetIdentifierNameInto(collection_name, query->collection_name);
query->if_not_exists = if_not_exists;
query->changes = changes;
query->cluster = std::move(cluster_str);
node = query;
return true;

View File

@ -548,6 +548,7 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// CREATE NAMED COLLECTION name [ON CLUSTER cluster]
class ParserCreateNamedCollectionQuery : public IParserBase
{
protected:

View File

@ -12,6 +12,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
ParserKeyword s_drop("DROP");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier name_p;
String cluster_str;
@ -31,7 +32,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -379,6 +379,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
@ -405,7 +407,15 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
ParserLiteral path_parser;
ASTPtr ast;
if (path_parser.parse(pos, ast, expected))
{
res->filesystem_cache_name = ast->as<ASTLiteral>()->value.safeGet<String>();
if (ParserKeyword{"KEY"}.ignore(pos, expected) && ParserIdentifier().parse(pos, ast, expected))
{
res->key_to_drop = ast->as<ASTIdentifier>()->name();
if (ParserKeyword{"OFFSET"}.ignore(pos, expected) && ParserLiteral().parse(pos, ast, expected))
res->offset_to_drop = ast->as<ASTLiteral>()->value.safeGet<UInt64>();
}
}
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
break;

View File

@ -148,7 +148,7 @@ static int compareValuesWithOffsetFloat(const IColumn * _compared_column,
const auto * reference_column = assert_cast<const ColumnType *>(
_reference_column);
const auto offset = _offset.get<typename ColumnType::ValueType>();
assert(offset >= 0);
chassert(offset >= 0);
const auto compared_value_data = compared_column->getDataAt(compared_row);
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));

View File

@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t file_offset = 0;
off_t read_until_position = 0;
std::optional<size_t> file_size;
off_t file_size;
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -68,6 +66,22 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
if (file_size_.has_value())
{
file_size = file_size_.value();
}
else
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
{
hdfsCloseFile(fs.get(), fin);
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
}
file_size = static_cast<size_t>(file_info->mSize);
hdfsFreeFileInfo(file_info, 1);
}
}
~ReadBufferFromHDFSImpl() override
@ -75,16 +89,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
size_t getFileSize()
size_t getFileSize() const
{
if (file_size)
return *file_size;
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
return file_size;
}
bool nextImpl() override
@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;

View File

@ -114,9 +114,9 @@ namespace
{
if (next_slash_after_glob_pos == std::string::npos)
{
result.emplace_back(
result.emplace_back(StorageHDFS::PathWithInfo{
String(ls.file_info[i].mName),
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)});
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)}});
}
else
{

View File

@ -262,6 +262,9 @@ struct SelectQueryInfo
// If limit is not 0, that means it's a trivial limit query.
UInt64 limit = 0;
/// For IStorageSystemOneBlock
std::vector<UInt8> columns_mask;
InputOrderInfoPtr getInputOrderInfo() const
{
return input_order_info ? input_order_info : (projection ? projection->input_order_info : nullptr);

View File

@ -1738,15 +1738,24 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait();
waitForOutdatedPartsToBeLoaded();
auto merge_blocker = stopMergesAndWait();
Stopwatch watch;
ProfileEventsScope profile_events_scope;
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
if (txn)
{
auto data_parts_lock = lockParts();
auto parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock);
removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock);
LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
auto parts = getVisibleDataPartsVector(query_context);
@ -1790,8 +1799,15 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
if (txn)
{
if (auto part = outdatePart(txn.get(), part_name, /*force=*/ true))
dropPartsImpl({part}, detach);
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
@ -1848,8 +1864,26 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
if (txn)
{
DataPartsVector parts_to_remove;
{
auto data_parts_lock = lockParts();
if (partition_ast && partition_ast->all)
parts_to_remove = getVisibleDataPartsVectorUnlocked(query_context, data_parts_lock);
else
{
String partition_id = getPartitionIDFromQuery(partition, query_context, &data_parts_lock);
parts_to_remove = getVisibleDataPartsVectorInPartition(query_context, partition_id, data_parts_lock);
}
removePartsFromWorkingSet(txn.get(), parts_to_remove, true, data_parts_lock);
}
dropPartsImpl(std::move(parts_to_remove), detach);
}
else
{
MergeTreeData::Transaction transaction(*this, txn.get());
auto operation_data_parts_lock = lockOperationsWithParts();
DataPartsVector parts;
@ -1864,12 +1898,14 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
}
if (detach)
{
for (const auto & part : parts)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
}
}
auto future_parts = initCoverageWithNewEmptyParts(parts);
@ -1898,6 +1934,33 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
clearEmptyParts();
}
void StorageMergeTree::dropPartsImpl(DataPartsVector && parts_to_remove, bool detach)
{
auto metadata_snapshot = getInMemoryMetadataPtr();
if (detach)
{
/// If DETACH clone parts to detached/ directory
/// NOTE: no race with background cleanup until we hold pointers to parts
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
}
}
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part->info);
}
if (detach)
LOG_INFO(log, "Detached {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
else
LOG_INFO(log, "Removed {} parts: [{}]", parts_to_remove.size(), fmt::join(getPartsNames(parts_to_remove), ", "));
}
PartitionCommandsResultInfo StorageMergeTree::attachPartition(
const ASTPtr & partition, const StorageMetadataPtr & /* metadata_snapshot */,
bool attach_part, ContextPtr local_context)

View File

@ -237,6 +237,7 @@ private:
void dropPartNoWaitNoThrow(const String & part_name) override;
void dropPart(const String & part_name, bool detach, ContextPtr context) override;
void dropPartition(const ASTPtr & partition, bool detach, ContextPtr context) override;
void dropPartsImpl(DataPartsVector && parts_to_remove, bool detach);
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override;

View File

@ -197,6 +197,7 @@ namespace ActionLocks
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
}
@ -4340,7 +4341,7 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
{
auto zookeeper = getZooKeeper();
LOG_DEBUG(log, "Cleaning up last parent node for partition {}", partition_id);
LOG_DEBUG(log, "Cleaning up last part node for partition {}", partition_id);
/// The name of the previous part for which the quorum was reached.
const String quorum_last_part_path = fs::path(zookeeper_path) / "quorum" / "last_part";
@ -4361,6 +4362,7 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
if (!parts_with_quorum.added_parts.contains(partition_id))
{
/// There is no information about interested part.
LOG_TEST(log, "There is no information about the partition");
break;
}
@ -4378,6 +4380,7 @@ void StorageReplicatedMergeTree::cleanLastPartNode(const String & partition_id)
else if (code == Coordination::Error::ZNONODE)
{
/// Node is deleted. It is impossible, but it is Ok.
LOG_WARNING(log, "The last part node {} was deleted", quorum_last_part_path);
break;
}
else if (code == Coordination::Error::ZBADVERSION)
@ -8169,6 +8172,9 @@ ActionLock StorageReplicatedMergeTree::getActionLock(StorageActionBlockType acti
if (action_type == ActionLocks::PartsMove)
return parts_mover.moves_blocker.cancel();
if (action_type == ActionLocks::PullReplicationLog)
return queue.pull_log_blocker.cancel();
return {};
}

View File

@ -4,6 +4,8 @@
#include <DataTypes/DataTypeString.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
@ -30,6 +32,8 @@ class IStorageSystemOneBlock : public IStorage
protected:
virtual void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const = 0;
virtual bool supportsColumnsMask() const { return false; }
public:
explicit IStorageSystemOneBlock(const StorageID & table_id_) : IStorage(table_id_)
{
@ -48,8 +52,15 @@ public:
size_t /*num_streams*/) override
{
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlockWithVirtuals(getVirtuals());
if (supportsColumnsMask())
{
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
query_info.columns_mask = std::move(columns_mask);
sample_block = std::move(header);
}
MutableColumns res_columns = sample_block.cloneEmptyColumns();
fillData(res_columns, context, query_info);

View File

@ -10,6 +10,7 @@
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Parsers/queryToString.h>
#include <Access/ContextAccess.h>
#include <Databases/IDatabase.h>
@ -315,23 +316,9 @@ Pipe StorageSystemColumns::read(
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
Block block_to_filter;
Storages storages;

View File

@ -5,6 +5,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Parsers/ASTIndexDeclaration.h>
@ -185,21 +186,9 @@ Pipe StorageSystemDataSkippingIndices::read(
size_t /* num_streams */)
{
storage_snapshot->check(column_names);
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
MutableColumnPtr column = ColumnString::create();

View File

@ -117,13 +117,23 @@ void StorageSystemDatabases::fillData(MutableColumns & res_columns, ContextPtr c
const auto & database = databases.at(database_name);
res_columns[0]->insert(database_name);
res_columns[1]->insert(database->getEngineName());
res_columns[2]->insert(context->getPath() + database->getDataPath());
res_columns[3]->insert(database->getMetadataPath());
res_columns[4]->insert(database->getUUID());
res_columns[5]->insert(getEngineFull(context, database));
res_columns[6]->insert(database->getDatabaseComment());
size_t src_index = 0;
size_t res_index = 0;
const auto & columns_mask = query_info.columns_mask;
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database_name);
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getEngineName());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(context->getPath() + database->getDataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getMetadataPath());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getUUID());
if (columns_mask[src_index++])
res_columns[res_index++]->insert(getEngineFull(context, database));
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getDatabaseComment());
}
}

View File

@ -26,6 +26,8 @@ public:
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
bool supportsColumnsMask() const override { return true; }
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};

View File

@ -6,6 +6,7 @@
#include <Storages/IStorage.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/System/StorageSystemPartsBase.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <IO/SharedThreadPools.h>
@ -81,13 +82,11 @@ struct WorkerState
class DetachedPartsSource : public ISource
{
public:
DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_,
bool has_bytes_on_disk_column_)
DetachedPartsSource(Block header_, std::shared_ptr<SourceState> state_, std::vector<UInt8> columns_mask_, UInt64 block_size_)
: ISource(std::move(header_))
, state(state_)
, columns_mask(std::move(columns_mask_))
, block_size(block_size_)
, has_bytes_on_disk_column(has_bytes_on_disk_column_)
{}
String getName() const override { return "DataPartsSource"; }
@ -127,7 +126,6 @@ private:
std::shared_ptr<SourceState> state;
const std::vector<UInt8> columns_mask;
const UInt64 block_size;
const bool has_bytes_on_disk_column;
const size_t support_threads = 35;
StoragesInfo current_info;
@ -149,9 +147,6 @@ private:
void calculatePartSizeOnDisk(size_t begin, std::vector<std::atomic<size_t>> & parts_sizes)
{
if (!has_bytes_on_disk_column)
return;
WorkerState worker_state;
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
@ -211,7 +206,9 @@ private:
auto begin = detached_parts.size() - rows;
std::vector<std::atomic<size_t>> parts_sizes(rows);
calculatePartSizeOnDisk(begin, parts_sizes);
constexpr size_t bytes_on_disk_col_idx = 4;
if (columns_mask[bytes_on_disk_col_idx])
calculatePartSizeOnDisk(begin, parts_sizes);
for (auto p_id = begin; p_id < detached_parts.size(); ++p_id)
{
@ -229,7 +226,7 @@ private:
new_columns[res_index++]->insert(p.dir_name);
if (columns_mask[src_index++])
{
chassert(has_bytes_on_disk_column);
chassert(src_index - 1 == bytes_on_disk_col_idx);
size_t bytes_on_disk = parts_sizes.at(p_id - begin).load();
new_columns[res_index++]->insert(bytes_on_disk);
}
@ -285,21 +282,7 @@ Pipe StorageSystemDetachedParts::read(
storage_snapshot->check(column_names);
Block sample_block = storage_snapshot->metadata->getSampleBlock();
NameSet names_set(column_names.begin(), column_names.end());
Block header;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
bool has_bytes_on_disk_column = names_set.contains("bytes_on_disk");
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
auto state = std::make_shared<SourceState>(StoragesInfoStream(query_info, context));
@ -307,7 +290,7 @@ Pipe StorageSystemDetachedParts::read(
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size, has_bytes_on_disk_column);
auto source = std::make_shared<DetachedPartsSource>(header.cloneEmpty(), state, columns_mask, max_block_size);
pipe.addSource(std::move(source));
}

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/StorageMaterializedMySQL.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Access/ContextAccess.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
@ -254,21 +255,10 @@ Pipe StorageSystemPartsBase::read(
StoragesInfoStream stream(query_info, context);
/// Create the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample = storage_snapshot->metadata->getSampleBlock();
Block header;
std::vector<UInt8> columns_mask(sample.columns());
for (size_t i = 0; i < sample.columns(); ++i)
{
if (names_set.contains(sample.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample.getByPosition(i));
}
}
auto [columns_mask, header] = getQueriedColumnsMaskAndHeader(sample, column_names);
MutableColumns res_columns = header.cloneEmptyColumns();
if (has_state_column)
res_columns.push_back(ColumnString::create());

View File

@ -3,6 +3,7 @@
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h>
#include <Storages/System/StorageSystemTables.h>
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/VirtualColumnUtils.h>
@ -587,23 +588,9 @@ Pipe StorageSystemTables::read(
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
/// Create a mask of what columns are needed in the result.
NameSet names_set(column_names.begin(), column_names.end());
Block sample_block = storage_snapshot->metadata->getSampleBlock();
Block res_block;
std::vector<UInt8> columns_mask(sample_block.columns());
for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
res_block.insert(sample_block.getByPosition(i));
}
}
auto [columns_mask, res_block] = getQueriedColumnsMaskAndHeader(sample_block, column_names);
ColumnPtr filtered_databases_column = getFilteredDatabases(query_info, context);
ColumnPtr filtered_tables_column = getFilteredTables(query_info.query, filtered_databases_column, context);

View File

@ -27,7 +27,6 @@ static void createInformationSchemaView(ContextMutablePtr context, IDatabase & d
database.getDatabaseName() == DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE);
if (database.getEngineName() != "Memory")
return;
bool is_uppercase = database.getDatabaseName() == DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE;
String metadata_resource_name = view_name + ".sql";
if (query.empty())
@ -42,13 +41,18 @@ static void createInformationSchemaView(ContextMutablePtr context, IDatabase & d
assert(view_name == ast_create.getTable());
ast_create.attach = false;
ast_create.setDatabase(database.getDatabaseName());
if (is_uppercase)
ast_create.setTable(Poco::toUpper(view_name));
StoragePtr view = createTableFromAST(ast_create, database.getDatabaseName(),
database.getTableDataPath(ast_create), context, true).second;
database.createTable(context, ast_create.getTable(), view, ast);
ASTPtr ast_upper = ast_create.clone();
auto & ast_create_upper = ast_upper->as<ASTCreateQuery &>();
ast_create_upper.setTable(Poco::toUpper(view_name));
StoragePtr view_upper = createTableFromAST(ast_create_upper, database.getDatabaseName(),
database.getTableDataPath(ast_create_upper), context, true).second;
database.createTable(context, ast_create_upper.getTable(), view_upper, ast_upper);
}
catch (...)
{

View File

@ -0,0 +1,24 @@
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names)
{
std::vector<UInt8> columns_mask(sample_block.columns());
Block header;
NameSet names_set(column_names.begin(), column_names.end());
for (size_t i = 0; i < columns_mask.size(); ++i)
{
if (names_set.contains(sample_block.getByPosition(i).name))
{
columns_mask[i] = 1;
header.insert(sample_block.getByPosition(i));
}
}
return std::make_pair(columns_mask, header);
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <base/types.h>
#include <Core/Names.h>
#include <Core/Block.h>
namespace DB
{
std::pair<std::vector<UInt8>, Block> getQueriedColumnsMaskAndHeader(const Block & sample_block, const Names & column_names);
}

View File

@ -188,7 +188,7 @@ def test_grant_all_on_table():
instance.query("SHOW GRANTS FOR B")
== "GRANT SHOW TABLES, SHOW COLUMNS, SHOW DICTIONARIES, SELECT, INSERT, ALTER TABLE, ALTER VIEW, CREATE TABLE, CREATE VIEW, CREATE DICTIONARY, "
"DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, SHOW ROW POLICIES, "
"SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, "
"SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, "
"SYSTEM RESTART REPLICA, SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM FLUSH DISTRIBUTED, dictGet ON test.table TO B\n"
)
instance.query("REVOKE ALL ON test.table FROM B", user="A")

View File

@ -1,7 +1,6 @@
import pytest
from helpers.client import CommandRequest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
@ -14,7 +13,7 @@ node = cluster.add_instance(
)
@pytest.fixture(scope="module", autouse=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
@ -24,122 +23,41 @@ def started_cluster():
cluster.shutdown()
def keeper_query(query: str):
return CommandRequest(
def test_base_commands(started_cluster):
_ = started_cluster
command = CommandRequest(
[
cluster.server_bin_path,
started_cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
"create test_create_zk_node1 testvalue1;create test_create_zk_node_2 testvalue2;get test_create_zk_node1;",
],
stdin="",
)
def test_big_family():
command = keeper_query(
"create test_big_family foo;"
"create test_big_family/1 foo;"
"create test_big_family/1/1 foo;"
"create test_big_family/1/2 foo;"
"create test_big_family/1/3 foo;"
"create test_big_family/1/4 foo;"
"create test_big_family/1/5 foo;"
"create test_big_family/2 foo;"
"create test_big_family/2/1 foo;"
"create test_big_family/2/2 foo;"
"create test_big_family/2/3 foo;"
"find_big_family test_big_family;"
)
assert command.get_answer() == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family/2", "3"],
["/test_big_family/2/3", "0"],
["/test_big_family/2/2", "0"],
["/test_big_family/2/1", "0"],
["/test_big_family/1/5", "0"],
["/test_big_family/1/4", "0"],
["/test_big_family/1/3", "0"],
["/test_big_family/1/2", "0"],
["/test_big_family/1/1", "0"],
]
)
command = keeper_query("find_big_family test_big_family 1;")
assert command.get_answer() == TSV(
[
["/test_big_family/1", "5"],
]
)
def test_find_super_nodes():
command = keeper_query(
"create test_find_super_nodes foo;"
"create test_find_super_nodes/1 foo;"
"create test_find_super_nodes/1/1 foo;"
"create test_find_super_nodes/1/2 foo;"
"create test_find_super_nodes/1/3 foo;"
"create test_find_super_nodes/1/4 foo;"
"create test_find_super_nodes/1/5 foo;"
"create test_find_super_nodes/2 foo;"
"create test_find_super_nodes/2/1 foo;"
"create test_find_super_nodes/2/2 foo;"
"create test_find_super_nodes/2/3 foo;"
"create test_find_super_nodes/2/4 foo;"
"cd test_find_super_nodes;"
"find_super_nodes 4;"
)
assert command.get_answer() == TSV(
[
["/test_find_super_nodes/1", "5"],
["/test_find_super_nodes/2", "4"],
]
)
def test_delete_stable_backups():
command = keeper_query(
"create /clickhouse/backups foo;"
"create /clickhouse/backups/1 foo;"
"create /clickhouse/backups/1/stage foo;"
"create /clickhouse/backups/1/stage/alive123 foo;"
"create /clickhouse/backups/2 foo;"
"create /clickhouse/backups/2/stage foo;"
"create /clickhouse/backups/2/stage/dead123 foo;"
"delete_stable_backups;"
"y;"
"ls clickhouse/backups;"
)
assert command.get_answer() == (
"You are going to delete all inactive backups in /clickhouse/backups. Continue?\n"
'Found backup "/clickhouse/backups/1", checking if it\'s active\n'
'Backup "/clickhouse/backups/1" is active, not going to delete\n'
'Found backup "/clickhouse/backups/2", checking if it\'s active\n'
'Backup "/clickhouse/backups/2" is not active, deleting it\n'
"1\n"
)
def test_base_commands():
command = keeper_query(
"create test_create_zk_node1 testvalue1;"
"create test_create_zk_node_2 testvalue2;"
"get test_create_zk_node1;"
)
assert command.get_answer() == "testvalue1\n"
def test_four_letter_word_commands():
command = keeper_query("ruok")
def test_four_letter_word_commands(started_cluster):
_ = started_cluster
command = CommandRequest(
[
started_cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
"ruok",
],
stdin="",
)
assert command.get_answer() == "imok\n"

View File

@ -14,6 +14,7 @@ a UNSIGNED TINYINT
Result:
tables 1
tables 1
tables 1
Columns:
a
b

View File

@ -679,7 +679,7 @@ def test_php_client(started_cluster, php_container):
demux=True,
)
assert code == 0
assert stdout.decode() == "tables\ntables\n"
assert stdout.decode() == "tables\ntables\ntables\n"
code, (stdout, stderr) = php_container.exec_run(
"php -f test_ssl.php {host} {port} default 123".format(
@ -688,7 +688,7 @@ def test_php_client(started_cluster, php_container):
demux=True,
)
assert code == 0
assert stdout.decode() == "tables\ntables\n"
assert stdout.decode() == "tables\ntables\ntables\n"
code, (stdout, stderr) = php_container.exec_run(
"php -f test.php {host} {port} user_with_double_sha1 abacaba".format(
@ -697,7 +697,7 @@ def test_php_client(started_cluster, php_container):
demux=True,
)
assert code == 0
assert stdout.decode() == "tables\ntables\n"
assert stdout.decode() == "tables\ntables\ntables\n"
code, (stdout, stderr) = php_container.exec_run(
"php -f test_ssl.php {host} {port} user_with_double_sha1 abacaba".format(
@ -706,7 +706,7 @@ def test_php_client(started_cluster, php_container):
demux=True,
)
assert code == 0
assert stdout.decode() == "tables\ntables\n"
assert stdout.decode() == "tables\ntables\ntables\n"
def test_mysqljs_client(started_cluster, nodejs_container):

View File

@ -0,0 +1,22 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>clickhouse1</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse2</host>
<port>9000</port>
</replica>
<replica>
<host>clickhouse3</host>
<port>9000</port>
</replica>
</shard>
<allow_distributed_ddl_queries>true</allow_distributed_ddl_queries>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,12 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<quota>default</quota>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,148 @@
"""
Test cases:
--- execute on the first node
create named collection foobar as a=1, b=2;
create named collection if not exists foobar on cluster '{cluster}' as a=1, b=2, c=3;
create named collection collection_present_on_first_node as a=1, b=2, s='string', x=0, y=-1;
--- execute on any other node
alter named collection foobar on cluster '{cluster}' set a=2, c=3;
alter named collection foobar on cluster '{cluster}' delete b;
alter named collection foobar on cluster '{cluster}' set a=3 delete c;
alter named collection if exists collection_absent_ewerywhere on cluster '{cluster}' delete b;
alter named collection if exists collection_present_on_first_node on cluster '{cluster}' delete b;
--- execute on every node
select * from system.named_collections;
--- execute on any node
drop named collection foobar on cluster '{cluster}';
drop named collection if exists collection_absent_ewerywhere on cluster '{cluster}';
drop named collection if exists collection_present_on_first_node on cluster '{cluster}';
--- execute on every node
select * from system.named_collections;
"""
import logging
from json import dumps, loads
from functools import partial
import pytest
from helpers.cluster import ClickHouseCluster
dumps = partial(dumps, ensure_ascii=False)
NODE01, NODE02, NODE03 = "clickhouse1", "clickhouse2", "clickhouse3"
CHECK_STRING_VALUE = "Some ~`$tr!ng-_+=123@#%^&&()|?[]{}<🤡>.,\t\n:;"
STMT_CREATE = "CREATE NAMED COLLECTION"
STMT_ALTER = "ALTER NAMED COLLECTION"
STMT_DROP = "DROP NAMED COLLECTION"
SYSTEM_TABLE = "system.named_collections"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
common_kwargs = dict(
main_configs=[
"configs/config.d/cluster.xml",
],
user_configs=[
"configs/users.d/default.xml",
],
with_zookeeper=True,
stay_alive=True,
)
for name in [NODE01, NODE02, NODE03]:
cluster.add_instance(name, **common_kwargs)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_create_alter_drop_on_cluster(cluster):
"""
Executes the set of queries and checks the final named collections state.
"""
q_count_collections = f"select count() from {SYSTEM_TABLE}"
def check_collections_empty():
for name, node in list(cluster.instances.items()):
assert (
"0" == node.query(q_count_collections).strip()
), f"{SYSTEM_TABLE} is not empty on {name}"
foobar_final_state = {"name": "foobar", "collection": {"a": "3"}}
collection_present_on_first_node_final_state = {
"name": "collection_present_on_first_node",
"collection": {"a": "1", "s": CHECK_STRING_VALUE, "x": "0", "y": "-1"},
}
expected_state = {
NODE01: [foobar_final_state, collection_present_on_first_node_final_state],
NODE02: [foobar_final_state],
NODE03: [foobar_final_state],
}
q_get_collections = f"select * from {SYSTEM_TABLE} order by name desc format JSON"
def check_state():
for name, node in list(cluster.instances.items()):
result = loads(node.query(q_get_collections))["data"]
logging.debug("%s ?= %s", dumps(result), dumps(expected_state[name]))
assert (
expected_state[name] == result
), f"invalid {SYSTEM_TABLE} content on {name}: {result}"
check_collections_empty()
# create executed on the first node
node = cluster.instances[NODE01]
node.query(f"{STMT_CREATE} foobar AS a=1, b=2")
node.query(
f"{STMT_CREATE} IF NOT EXISTS foobar ON CLUSTER 'cluster' AS a=1, b=2, c=3"
)
node.query(
f"{STMT_CREATE} collection_present_on_first_node AS a=1, b=2, s='{CHECK_STRING_VALUE}', x=0, y=-1"
)
# alter executed on the second node
node = cluster.instances[NODE02]
node.query(f"{STMT_ALTER} foobar ON CLUSTER 'cluster' SET a=2, c=3")
node.query(f"{STMT_ALTER} foobar ON CLUSTER 'cluster' DELETE b")
node.query(f"{STMT_ALTER} foobar ON CLUSTER 'cluster' SET a=3 DELETE c")
node.query(
f"{STMT_ALTER} IF EXISTS collection_absent_ewerywhere ON CLUSTER 'cluster' DELETE b"
)
node.query(
f"{STMT_ALTER} IF EXISTS collection_present_on_first_node ON CLUSTER 'cluster' DELETE b"
)
check_state()
for node in list(cluster.instances.values()):
node.restart_clickhouse()
check_state()
# drop executed on the third node
node = cluster.instances[NODE03]
node.query(f"{STMT_DROP} foobar ON CLUSTER 'cluster'")
node.query(
f"{STMT_DROP} IF EXISTS collection_absent_ewerywhere ON CLUSTER 'cluster'"
)
node.query(
f"{STMT_DROP} IF EXISTS collection_present_on_first_node ON CLUSTER 'cluster'"
)
check_collections_empty()
for node in list(cluster.instances.values()):
node.restart_clickhouse()
check_collections_empty()

View File

@ -51,7 +51,12 @@ instance = cluster.add_instance(
"configs/server.key",
],
user_configs=["configs/users.xml"],
env_variables={"UBSAN_OPTIONS": "print_stacktrace=1"},
env_variables={
"UBSAN_OPTIONS": "print_stacktrace=1",
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
"TSAN_OPTIONS": "report_atomic_races=0 "
+ os.getenv("TSAN_OPTIONS", default=""),
},
)

View File

@ -190,3 +190,15 @@ def test_information_schema():
)
== "1\n"
)
assert (
node.query(
"SELECT count() FROM information_schema.TABLES WHERE table_name='TABLES'"
)
== "2\n"
)
assert (
node.query(
"SELECT count() FROM INFORMATION_SCHEMA.tables WHERE table_name='tables'"
)
== "3\n"
)

View File

@ -761,7 +761,7 @@ def test_multiple_tables_streaming_sync(started_cluster, mode):
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
poll_size = 10
poll_size = 2
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
@ -785,7 +785,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
"""
)
for inst in [instance, instance_2]:
inst.query(
f"""
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
@ -800,7 +805,7 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
for _ in range(150):
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
@ -816,11 +821,12 @@ def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
]
assert len(res1) + len(res2) == files_to_generate
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert len(res1) + len(res2) == files_to_generate
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once

View File

@ -105,8 +105,6 @@ def test_rollback_unfinished_on_restart1(start_cluster):
"0_4_4_0_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n"
"0_8_8_0\t0\ttid5\tcsn18446744073709551615_\ttid0\tcsn0_\n"
"1_1_1_0\t0\ttid0\tcsn1_\ttid1\tcsn_1\n"
"1_1_1_1\t1\ttid1\tcsn_1\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
"1_1_1_1_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n"
"1_3_3_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
"1_3_3_0_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n"
"1_5_5_0\t1\ttid6\tcsn_6\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
@ -194,6 +192,5 @@ def test_rollback_unfinished_on_restart2(start_cluster):
"0_4_4_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
"0_5_5_0\t0\ttid5\tcsn18446744073709551615_\ttid0\tcsn0_\n"
"1_1_1_0\t0\ttid0\tcsn1_\ttid1\tcsn_1\n"
"1_1_1_1\t1\ttid1\tcsn_1\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
"1_3_3_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n"
)

View File

@ -5,7 +5,7 @@
[1]
[1]
[1]
[NULL,1]
[1,NULL]
[1]
[1]
[[1,1]]

View File

@ -11,7 +11,7 @@ ${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS parallel_ddl"
function query()
{
for _ in {1..100}; do
for _ in {1..50}; do
${CLICKHOUSE_CLIENT} --query "CREATE DATABASE IF NOT EXISTS parallel_ddl"
${CLICKHOUSE_CLIENT} --query "DROP DATABASE IF EXISTS parallel_ddl"
done

View File

@ -1,3 +1,4 @@
2
CREATE DATABASE test_01114_1\nENGINE = Atomic
CREATE DATABASE test_01114_2\nENGINE = Atomic
CREATE DATABASE test_01114_3\nENGINE = Ordinary

View File

@ -13,6 +13,8 @@ DROP DATABASE IF EXISTS test_01114_2;
DROP DATABASE IF EXISTS test_01114_3;
"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Ordinary" 2>&1| grep -Fac "UNKNOWN_DATABASE_ENGINE"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_1 ENGINE=Atomic"
$CLICKHOUSE_CLIENT -q "CREATE DATABASE test_01114_2"
$CLICKHOUSE_CLIENT --allow_deprecated_database_ordinary=1 -q "CREATE DATABASE test_01114_3 ENGINE=Ordinary"

View File

@ -1,3 +1,7 @@
COLUMNS
SCHEMATA
TABLES
VIEWS
columns
schemata
tables
@ -6,6 +10,10 @@ COLUMNS
SCHEMATA
TABLES
VIEWS
columns
schemata
tables
views
INFORMATION_SCHEMA INFORMATION_SCHEMA default \N \N \N \N
information_schema information_schema default \N \N \N \N
default default mv VIEW

View File

@ -21,20 +21,20 @@ tx7 7 20 all_1_1_0_13
tx7 7 40 all_14_14_0
tx7 7 60 all_7_7_0_13
tx7 7 80 all_12_12_0_13
tx7 8 20 all_1_14_2_13
tx7 8 40 all_1_14_2_13
tx7 8 60 all_1_14_2_13
tx7 8 80 all_1_14_2_13
tx7 8 20 all_1_14_1_13
tx7 8 40 all_1_14_1_13
tx7 8 60 all_1_14_1_13
tx7 8 80 all_1_14_1_13
Serialization error
INVALID_TRANSACTION
tx11 9 21 all_1_14_2_17
tx11 9 41 all_1_14_2_17
tx11 9 61 all_1_14_2_17
tx11 9 81 all_1_14_2_17
tx11 9 21 all_1_14_1_17
tx11 9 41 all_1_14_1_17
tx11 9 61 all_1_14_1_17
tx11 9 81 all_1_14_1_17
1 1 RUNNING
tx14 10 22 all_1_14_2_18
tx14 10 42 all_1_14_2_18
tx14 10 62 all_1_14_2_18
tx14 10 82 all_1_14_2_18
tx14 10 22 all_1_14_1_18
tx14 10 42 all_1_14_1_18
tx14 10 62 all_1_14_1_18
tx14 10 82 all_1_14_1_18
11 2 all_2_2_0
11 10 all_1_1_0_3

View File

@ -53,9 +53,6 @@ tx 6 "alter table mt update n=n*10 wh
tx 6 "insert into mt values (40)"
tx 6 "commit"
function accept_both_parts() {
sed 's/all_1_14_1_1/all_1_14_2_1/g'
}
tx 7 "begin transaction"
tx 7 "select 7, n, _part from mt order by n"
@ -64,7 +61,7 @@ tx_async 8 "alter table mt update n = 0 whe
$CLICKHOUSE_CLIENT -q "kill mutation where database=currentDatabase() and mutation_id='mutation_15.txt' format Null" 2>&1| grep -Fv "probably it finished"
tx_sync 8 "rollback"
tx 7 "optimize table mt final"
tx 7 "select 8, n, _part from mt order by n" | accept_both_parts
tx 7 "select 8, n, _part from mt order by n"
tx 10 "begin transaction"
tx 10 "alter table mt update n = 0 where 1" | grep -Eo "Serialization error" | uniq
tx 7 "alter table mt update n=n+1 where 1"
@ -74,7 +71,7 @@ tx 7 "commit"
tx_async 11 "begin transaction"
tx_async 11 "select 9, n, _part from mt order by n" | accept_both_parts
tx_async 11 "select 9, n, _part from mt order by n"
tx_async 12 "begin transaction"
tx_async 11 "alter table mt update n=n+1 where 1" >/dev/null
tx_async 12 "alter table mt update n=n+1 where 1" >/dev/null
@ -91,7 +88,7 @@ $CLICKHOUSE_CLIENT -q "kill transaction where tid=$tid_to_kill format Null"
tx_sync 13 "rollback"
tx 14 "begin transaction"
tx 14 "select 10, n, _part from mt order by n" | accept_both_parts
tx 14 "select 10, n, _part from mt order by n"
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=0 -q "drop table mt"

View File

@ -0,0 +1,8 @@
1 1
2 1
3 1
4 1
1
10 100
1 1 1
2 1 1

View File

@ -0,0 +1,123 @@
#!/usr/bin/env bash
# Tags: long, no-replicated-database, no-ordinary-database
# shellcheck disable=SC2015
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst";
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0";
function thread_insert()
{
set -e
val=1
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($val, 1) */ ($val, 1);
INSERT INTO src VALUES /* ($val, 2) */ ($val, 2);
COMMIT;"
val=$((val+1))
sleep 0.$RANDOM;
done
}
# NOTE
# ALTER PARTITION query stops merges,
# but serialization error is still possible if some merge was assigned (and committed) between BEGIN and ALTER.
function thread_partition_src_to_dst()
{
set -e
count=0
sum=0
for i in {1..20}; do
out=$(
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES /* ($i, 3) */ ($i, 3);
INSERT INTO dst SELECT * FROM src;
ALTER TABLE src DROP PARTITION ID 'all';
SET throw_on_unsupported_query_inside_transaction=0;
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=3) != ($count + 1, $sum + $i)) FORMAT Null;
COMMIT;" 2>&1) ||:
echo "$out" | grep -Fv "SERIALIZATION_ERROR" | grep -F "Received from " && $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select $i, 'src', type, n, _part from src order by type, n;
select $i, 'dst', type, n, _part from dst order by type, n;
rollback" ||:
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || count=$((count+1))
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || sum=$((sum+i))
done
}
function thread_partition_dst_to_src()
{
set -e
for i in {1..20}; do
action="ROLLBACK"
if (( i % 2 )); then
action="COMMIT"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
SYSTEM STOP MERGES dst;
ALTER TABLE dst DROP PARTITION ID 'nonexistent'; -- STOP MERGES doesn't wait for started merges to finish, so we use this trick
SYSTEM SYNC TRANSACTION LOG;
BEGIN TRANSACTION;
INSERT INTO dst VALUES /* ($i, 4) */ ($i, 4);
INSERT INTO src SELECT * FROM dst;
ALTER TABLE dst DROP PARTITION ID 'all';
SET throw_on_unsupported_query_inside_transaction=0;
SYSTEM START MERGES dst;
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=4) != (toUInt8($i/2 + 1), (select sum(number) from numbers(1, $i) where number % 2 or number=$i))) FORMAT Null;
$action;"
done
}
function thread_select()
{
set -e
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
-- no duplicates
SELECT type, throwIf(count(n) != countDistinct(n)) FROM src GROUP BY type FORMAT Null;
SELECT type, throwIf(count(n) != countDistinct(n)) FROM dst GROUP BY type FORMAT Null;
-- rows inserted by thread_insert moved together
SET throw_on_unsupported_query_inside_transaction=0;
SELECT _table, throwIf(arraySort(groupArrayIf(n, type=1)) != arraySort(groupArrayIf(n, type=2))) FROM merge(currentDatabase(), '') GROUP BY _table FORMAT Null;
-- all rows are inserted in insert_thread
SELECT type, throwIf(count(n) != max(n)), throwIf(sum(n) != max(n)*(max(n)+1)/2) FROM merge(currentDatabase(), '') WHERE type IN (1, 2) GROUP BY type ORDER BY type FORMAT Null;
COMMIT;"
done
}
thread_insert & PID_1=$!
thread_select & PID_2=$!
thread_partition_src_to_dst & PID_3=$!
thread_partition_dst_to_src & PID_4=$!
wait $PID_3 && wait $PID_4
kill -TERM $PID_1
kill -TERM $PID_2
wait
wait_for_queries_to_finish
$CLICKHOUSE_CLIENT -q "SELECT type, count(n) = countDistinct(n) FROM merge(currentDatabase(), '') GROUP BY type ORDER BY type"
$CLICKHOUSE_CLIENT -q "SELECT DISTINCT arraySort(groupArrayIf(n, type=1)) = arraySort(groupArrayIf(n, type=2)) FROM merge(currentDatabase(), '') GROUP BY _table ORDER BY _table"
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM merge(currentDatabase(), '') WHERE type=4"
$CLICKHOUSE_CLIENT -q "SELECT type, count(n) == max(n), sum(n) == max(n)*(max(n)+1)/2 FROM merge(currentDatabase(), '') WHERE type IN (1, 2) GROUP BY type ORDER BY type"
$CLICKHOUSE_CLIENT --query "DROP TABLE src";
$CLICKHOUSE_CLIENT --query "DROP TABLE dst";

Some files were not shown because too many files have changed in this diff Show More