Merge branch 'master' into tests/01293_client_interactive_vertical_multiline

This commit is contained in:
Alexander Gololobov 2023-08-04 09:53:24 +02:00 committed by GitHub
commit 936dd5cd4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
170 changed files with 1616 additions and 901 deletions

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.7.1.2470"
ARG VERSION="23.7.2.25"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -200,8 +200,8 @@ Templates:
- [Server Setting](_description_templates/template-server-setting.md)
- [Database or Table engine](_description_templates/template-engine.md)
- [System table](_description_templates/template-system-table.md)
- [Data type](_description_templates/data-type.md)
- [Statement](_description_templates/statement.md)
- [Data type](_description_templates/template-data-type.md)
- [Statement](_description_templates/template-statement.md)
<a name="how-to-build-docs"/>

View File

@ -0,0 +1,31 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.7.2.25-stable (8dd1107b032) FIXME as compared to v23.7.1.2470-stable (a70127baecc)
#### Backward Incompatible Change
* Backported in [#52850](https://github.com/ClickHouse/ClickHouse/issues/52850): If a dynamic disk contains a name, it should be specified as `disk = disk(name = 'disk_name'`, ...) in disk function arguments. In previous version it could be specified as `disk = disk_<disk_name>(...)`, which is no longer supported. [#52820](https://github.com/ClickHouse/ClickHouse/pull/52820) ([Kseniia Sumarokova](https://github.com/kssenii)).
#### Build/Testing/Packaging Improvement
* Backported in [#52913](https://github.com/ClickHouse/ClickHouse/issues/52913): Add `clickhouse-keeper-client` symlink to the clickhouse-server package. [#51882](https://github.com/ClickHouse/ClickHouse/pull/51882) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix binary arithmetic for Nullable(IPv4) [#51642](https://github.com/ClickHouse/ClickHouse/pull/51642) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* Support IPv4 and IPv6 as dictionary attributes [#51756](https://github.com/ClickHouse/ClickHouse/pull/51756) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
* init and destroy ares channel on demand.. [#52634](https://github.com/ClickHouse/ClickHouse/pull/52634) ([Arthur Passos](https://github.com/arthurpassos)).
* Fix crash in function `tuple` with one sparse column argument [#52659](https://github.com/ClickHouse/ClickHouse/pull/52659) ([Anton Popov](https://github.com/CurtizJ)).
* Fix data race in Keeper reconfiguration [#52804](https://github.com/ClickHouse/ClickHouse/pull/52804) ([Antonio Andelic](https://github.com/antonio2368)).
* clickhouse-keeper: fix implementation of server with poll() [#52833](https://github.com/ClickHouse/ClickHouse/pull/52833) ([Andy Fiddaman](https://github.com/citrus-it)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Rename setting disable_url_encoding to enable_url_encoding and add a test [#52656](https://github.com/ClickHouse/ClickHouse/pull/52656) ([Kruglov Pavel](https://github.com/Avogar)).
* Fix bugs and better test for SYSTEM STOP LISTEN [#52680](https://github.com/ClickHouse/ClickHouse/pull/52680) ([Nikolay Degterinsky](https://github.com/evillique)).
* Increase min protocol version for sparse serialization [#52835](https://github.com/ClickHouse/ClickHouse/pull/52835) ([Anton Popov](https://github.com/CurtizJ)).
* Docker improvements [#52869](https://github.com/ClickHouse/ClickHouse/pull/52869) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -141,6 +141,10 @@ Runs [stateful functional tests](tests.md#functional-tests). Treat them in the s
Runs [integration tests](tests.md#integration-tests).
## Bugfix validate check
Checks that either a new test (functional or integration) or there some changed tests that fail with the binary built on master branch. This check is triggered when pull request has "pr-bugfix" label.
## Stress Test
Runs stateless functional tests concurrently from several clients to detect
concurrency-related errors. If it fails:

View File

@ -22,7 +22,7 @@ CREATE TABLE deltalake
- `url` — Bucket url with path to the existing Delta Lake table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -22,7 +22,7 @@ CREATE TABLE hudi_table
- `url` — Bucket url with the path to an existing Hudi table.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file.
Engine parameters can be specified using [Named Collections](../../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Example**

View File

@ -51,7 +51,3 @@ keeper foo bar
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stable_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)

View File

@ -36,6 +36,8 @@ These `ALTER` statements modify entities related to role-based access control:
[ALTER TABLE ... MODIFY COMMENT](/docs/en/sql-reference/statements/alter/comment.md) statement adds, modifies, or removes comments to the table, regardless if it was set before or not.
[ALTER NAMED COLLECTION](/docs/en/sql-reference/statements/alter/named-collection.md) statement modifies [Named Collections](/docs/en/operations/named-collections.md).
## Mutations
`ALTER` queries that are intended to manipulate table data are implemented with a mechanism called “mutations”, most notably [ALTER TABLE … DELETE](/docs/en/sql-reference/statements/alter/delete.md) and [ALTER TABLE … UPDATE](/docs/en/sql-reference/statements/alter/update.md). They are asynchronous background processes similar to merges in [MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables that to produce new “mutated” versions of parts.

View File

@ -0,0 +1,30 @@
---
slug: /en/sql-reference/statements/alter/named-collection
sidebar_label: NAMED COLLECTION
---
# ALTER NAMED COLLECTION
This query intends to modify already existing named collections.
**Syntax**
```sql
ALTER NAMED COLLECTION [IF EXISTS] name [ON CLUSTER cluster]
[ SET
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
... ] |
[ DELETE key_name4, key_name5, ... ]
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
ALTER NAMED COLLECTION foobar SET a = '2', c = '3';
ALTER NAMED COLLECTION foobar DELETE b;
```

View File

@ -8,13 +8,14 @@ sidebar_label: CREATE
Create queries make a new entity of one of the following kinds:
- [DATABASE](../../../sql-reference/statements/create/database.md)
- [TABLE](../../../sql-reference/statements/create/table.md)
- [VIEW](../../../sql-reference/statements/create/view.md)
- [DICTIONARY](../../../sql-reference/statements/create/dictionary.md)
- [FUNCTION](../../../sql-reference/statements/create/function.md)
- [USER](../../../sql-reference/statements/create/user.md)
- [ROLE](../../../sql-reference/statements/create/role.md)
- [ROW POLICY](../../../sql-reference/statements/create/row-policy.md)
- [QUOTA](../../../sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](../../../sql-reference/statements/create/settings-profile.md)
- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
- [TABLE](/docs/en/sql-reference/statements/create/table.md)
- [VIEW](/docs/en/sql-reference/statements/create/view.md)
- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
- [USER](/docs/en/sql-reference/statements/create/user.md)
- [ROLE](/docs/en/sql-reference/statements/create/role.md)
- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)

View File

@ -0,0 +1,34 @@
---
slug: /en/sql-reference/statements/create/named-collection
sidebar_label: NAMED COLLECTION
---
# CREATE NAMED COLLECTION
Creates a new named collection.
**Syntax**
```sql
CREATE NAMED COLLECTION [IF NOT EXISTS] name [ON CLUSTER cluster] AS
key_name1 = 'some value',
key_name2 = 'some value',
key_name3 = 'some value',
...
```
**Example**
```sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
```
**Related statements**
- [CREATE NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/alter/named-collection)
- [DROP NAMED COLLECTION](https://clickhouse.com/docs/en/sql-reference/statements/drop#drop-function)
**See Also**
- [Named collections guide](/docs/en/operations/named-collections.md)

View File

@ -119,3 +119,20 @@ DROP FUNCTION [IF EXISTS] function_name [on CLUSTER cluster]
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
DROP FUNCTION linear_equation;
```
## DROP NAMED COLLECTION
Deletes a named collection.
**Syntax**
``` sql
DROP NAMED COLLECTION [IF EXISTS] name [on CLUSTER cluster]
```
**Example**
``` sql
CREATE NAMED COLLECTION foobar AS a = '1', b = '2';
DROP NAMED COLLECTION foobar;
```

View File

@ -314,6 +314,22 @@ Provides possibility to start background fetch tasks from replication queues whi
SYSTEM START REPLICATION QUEUES [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### STOP PULLING REPLICATION LOG
Stops loading new entries from replication log to replication queue in a `ReplicatedMergeTree` table.
``` sql
SYSTEM STOP PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### START PULLING REPLICATION LOG
Cancels `SYSTEM STOP PULLING REPLICATION LOG`.
``` sql
SYSTEM START PULLING REPLICATION LOG [ON CLUSTER cluster_name] [[db.]replicated_merge_tree_family_table_name]
```
### SYNC REPLICA
Wait until a `ReplicatedMergeTree` table will be synced with other replicas in a cluster, but no more than `receive_timeout` seconds.

View File

@ -21,7 +21,7 @@ iceberg(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure])
- `format` — The [format](/docs/en/interfaces/formats.md/#formats) of the file. By default `Parquet` is used.
- `structure` — Structure of the table. Format `'column1_name column1_type, column2_name column2_type, ...'`.
Engine parameters can be specified using [Named Collections](../../operations/named-collections.md)
Engine parameters can be specified using [Named Collections](/docs/en/operations/named-collections.md).
**Returned value**

View File

@ -1,6 +1,5 @@
#include "Commands.h"
#include <queue>
#include "KeeperClient.h"
@ -25,18 +24,8 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
else
path = client->cwd;
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
bool need_space = false;
for (const auto & child : children)
{
if (std::exchange(need_space, true))
std::cout << " ";
std::cout << child;
}
for (const auto & child : client->zookeeper->getChildren(path))
std::cout << child << " ";
std::cout << "\n";
}
@ -141,173 +130,6 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void GetStatCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
Coordination::Stat stat;
String path;
if (!query->args.empty())
path = client->getAbsolutePath(query->args[0].safeGet<String>());
else
path = client->cwd;
client->zookeeper->get(path, &stat);
std::cout << "cZxid = " << stat.czxid << "\n";
std::cout << "mZxid = " << stat.mzxid << "\n";
std::cout << "pZxid = " << stat.pzxid << "\n";
std::cout << "ctime = " << stat.ctime << "\n";
std::cout << "mtime = " << stat.mtime << "\n";
std::cout << "version = " << stat.version << "\n";
std::cout << "cversion = " << stat.cversion << "\n";
std::cout << "aversion = " << stat.aversion << "\n";
std::cout << "ephemeralOwner = " << stat.ephemeralOwner << "\n";
std::cout << "dataLength = " << stat.dataLength << "\n";
std::cout << "numChildren = " << stat.numChildren << "\n";
}
bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
ASTPtr threshold;
if (!ParserUnsignedInteger{}.parse(pos, threshold, expected))
return false;
node->args.push_back(threshold->as<ASTLiteral &>().value);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
return true;
}
void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto threshold = query->args[0].safeGet<UInt64>();
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
}
bool DeleteStableBackups::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
}
void DeleteStableBackups::execute(const ASTKeeperQuery * /* query */, KeeperClient * client) const
{
client->askConfirmation(
"You are going to delete all inactive backups in /clickhouse/backups.",
[client]
{
fs::path backup_root = "/clickhouse/backups";
auto backups = client->zookeeper->getChildren(backup_root);
std::sort(backups.begin(), backups.end());
for (const auto & child : backups)
{
auto backup_path = backup_root / child;
std::cout << "Found backup " << backup_path << ", checking if it's active\n";
String stage_path = backup_path / "stage";
auto stages = client->zookeeper->getChildren(stage_path);
bool is_active = false;
for (const auto & stage : stages)
{
if (startsWith(stage, "alive"))
{
is_active = true;
break;
}
}
if (is_active)
{
std::cout << "Backup " << backup_path << " is active, not going to delete\n";
continue;
}
std::cout << "Backup " << backup_path << " is not active, deleting it\n";
client->zookeeper->removeRecursive(backup_path);
}
});
}
bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
ASTPtr count;
if (ParserUnsignedInteger{}.parse(pos, count, expected))
node->args.push_back(count->as<ASTLiteral &>().value);
else
node->args.push_back(UInt64(10));
return true;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
std::transform(children.cbegin(), children.cend(), children.begin(), [&](const String & child) { return next_path / child; });
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)
std::cout << std::get<1>(result[i]) << "\t" << std::get<0>(result[i]) << "\n";
}
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
@ -348,7 +170,7 @@ bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery
void HelpCommand::execute(const ASTKeeperQuery * /* query */, KeeperClient * /* client */) const
{
for (const auto & pair : KeeperClient::commands)
std::cout << pair.second->generateHelpString() << "\n";
std::cout << pair.second->getHelpMessage() << "\n";
}
bool FourLetterWordCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const

View File

@ -21,12 +21,6 @@ public:
virtual String getName() const = 0;
virtual ~IKeeperClientCommand() = default;
String generateHelpString() const
{
return fmt::vformat(getHelpMessage(), fmt::make_format_args(getName()));
}
};
using Command = std::shared_ptr<IKeeperClientCommand>;
@ -40,7 +34,7 @@ class LSCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Lists the nodes for the given path (default: cwd)"; }
String getHelpMessage() const override { return "ls [path] -- Lists the nodes for the given path (default: cwd)"; }
};
class CDCommand : public IKeeperClientCommand
@ -51,7 +45,7 @@ class CDCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Change the working path (default `.`)"; }
String getHelpMessage() const override { return "cd [path] -- Change the working path (default `.`)"; }
};
class SetCommand : public IKeeperClientCommand
@ -64,7 +58,7 @@ class SetCommand : public IKeeperClientCommand
String getHelpMessage() const override
{
return "{} <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
return "set <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
}
};
@ -76,7 +70,7 @@ class CreateCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> <value> -- Creates new node"; }
String getHelpMessage() const override { return "create <path> <value> -- Creates new node"; }
};
class GetCommand : public IKeeperClientCommand
@ -87,63 +81,9 @@ class GetCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
String getHelpMessage() const override { return "get <path> -- Returns the node's value"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Returns the node's stat (default `.`)"; }
};
class FindSuperNodes : public IKeeperClientCommand
{
String getName() const override { return "find_super_nodes"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} <threshold> [path] -- Finds nodes with number of children larger than some threshold for the given path (default `.`)";
}
};
class DeleteStableBackups : public IKeeperClientCommand
{
String getName() const override { return "delete_stable_backups"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} -- Deletes ClickHouse nodes used for backups that are now inactive";
}
};
class FindBigFamily : public IKeeperClientCommand
{
String getName() const override { return "find_big_family"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} [path] [n] -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)";
}
};
class RMCommand : public IKeeperClientCommand
{
String getName() const override { return "rm"; }
@ -152,7 +92,7 @@ class RMCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Remove the node"; }
String getHelpMessage() const override { return "remove <path> -- Remove the node"; }
};
class RMRCommand : public IKeeperClientCommand
@ -163,7 +103,7 @@ class RMRCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
String getHelpMessage() const override { return "rmr <path> -- Recursively deletes path. Confirmation required"; }
};
class HelpCommand : public IKeeperClientCommand
@ -174,7 +114,7 @@ class HelpCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} -- Prints this message"; }
String getHelpMessage() const override { return "help -- Prints this message"; }
};
class FourLetterWordCommand : public IKeeperClientCommand
@ -185,7 +125,7 @@ class FourLetterWordCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <command> -- Executes four-letter-word command"; }
String getHelpMessage() const override { return "flwc <command> -- Executes four-letter-word command"; }
};
}

View File

@ -177,10 +177,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStableBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<HelpCommand>(),

View File

@ -58,7 +58,6 @@ bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
String command_name(pos->begin, pos->end);
std::transform(command_name.begin(), command_name.end(), command_name.begin(), [](unsigned char c) { return std::tolower(c); });
Command command;
auto iter = KeeperClient::commands.find(command_name);

View File

@ -1650,6 +1650,7 @@ try
database_catalog.initializeAndLoadTemporaryDatabase();
loadMetadataSystem(global_context);
maybeConvertSystemDatabase(global_context);
startupSystemTables();
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
@ -1668,7 +1669,6 @@ try
/// Then, load remaining databases
loadMetadata(global_context, default_database);
convertDatabasesEnginesIfNeed(global_context);
startupSystemTables();
database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);

View File

@ -6,6 +6,7 @@
#include <Access/DiskAccessStorage.h>
#include <Access/LDAPAccessStorage.h>
#include <Access/ContextAccess.h>
#include <Access/EnabledSettings.h>
#include <Access/EnabledRolesInfo.h>
#include <Access/RoleCache.h>
#include <Access/RowPolicyCache.h>
@ -729,6 +730,14 @@ std::shared_ptr<const EnabledRoles> AccessControl::getEnabledRoles(
}
std::shared_ptr<const EnabledRolesInfo> AccessControl::getEnabledRolesInfo(
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const
{
return getEnabledRoles(current_roles, current_roles_with_admin_option)->getRolesInfo();
}
std::shared_ptr<const EnabledRowPolicies> AccessControl::getEnabledRowPolicies(const UUID & user_id, const boost::container::flat_set<UUID> & enabled_roles) const
{
return row_policy_cache->getEnabledRowPolicies(user_id, enabled_roles);
@ -772,6 +781,15 @@ std::shared_ptr<const EnabledSettings> AccessControl::getEnabledSettings(
return settings_profiles_cache->getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles);
}
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getEnabledSettingsInfo(
const UUID & user_id,
const SettingsProfileElements & settings_from_user,
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const
{
return getEnabledSettings(user_id, settings_from_user, enabled_roles, settings_from_enabled_roles)->getInfo();
}
std::shared_ptr<const SettingsProfilesInfo> AccessControl::getSettingsProfileInfo(const UUID & profile_id)
{
return settings_profiles_cache->getSettingsProfileInfo(profile_id);

View File

@ -29,6 +29,7 @@ class ContextAccessParams;
struct User;
using UserPtr = std::shared_ptr<const User>;
class EnabledRoles;
struct EnabledRolesInfo;
class RoleCache;
class EnabledRowPolicies;
class RowPolicyCache;
@ -187,6 +188,10 @@ public:
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const;
std::shared_ptr<const EnabledRolesInfo> getEnabledRolesInfo(
const std::vector<UUID> & current_roles,
const std::vector<UUID> & current_roles_with_admin_option) const;
std::shared_ptr<const EnabledRowPolicies> getEnabledRowPolicies(
const UUID & user_id,
const boost::container::flat_set<UUID> & enabled_roles) const;
@ -209,6 +214,12 @@ public:
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const;
std::shared_ptr<const SettingsProfilesInfo> getEnabledSettingsInfo(
const UUID & user_id,
const SettingsProfileElements & settings_from_user,
const boost::container::flat_set<UUID> & enabled_roles,
const SettingsProfileElements & settings_from_enabled_roles) const;
std::shared_ptr<const SettingsProfilesInfo> getSettingsProfileInfo(const UUID & profile_id);
const ExternalAuthenticators & getExternalAuthenticators() const;

View File

@ -168,6 +168,7 @@ enum class AccessType
M(SYSTEM_TTL_MERGES, "SYSTEM STOP TTL MERGES, SYSTEM START TTL MERGES, STOP TTL MERGES, START TTL MERGES", TABLE, SYSTEM) \
M(SYSTEM_FETCHES, "SYSTEM STOP FETCHES, SYSTEM START FETCHES, STOP FETCHES, START FETCHES", TABLE, SYSTEM) \
M(SYSTEM_MOVES, "SYSTEM STOP MOVES, SYSTEM START MOVES, STOP MOVES, START MOVES", TABLE, SYSTEM) \
M(SYSTEM_PULLING_REPLICATION_LOG, "SYSTEM STOP PULLING REPLICATION LOG, SYSTEM START PULLING REPLICATION LOG", TABLE, SYSTEM) \
M(SYSTEM_DISTRIBUTED_SENDS, "SYSTEM STOP DISTRIBUTED SENDS, SYSTEM START DISTRIBUTED SENDS, STOP DISTRIBUTED SENDS, START DISTRIBUTED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_REPLICATED_SENDS, "SYSTEM STOP REPLICATED SENDS, SYSTEM START REPLICATED SENDS, STOP REPLICATED SENDS, START REPLICATED SENDS", TABLE, SYSTEM_SENDS) \
M(SYSTEM_SENDS, "SYSTEM STOP SENDS, SYSTEM START SENDS, STOP SENDS, START SENDS", GROUP, SYSTEM) \

View File

@ -51,7 +51,7 @@ TEST(AccessRights, Union)
"CREATE DICTIONARY, DROP DATABASE, DROP TABLE, DROP VIEW, DROP DICTIONARY, UNDROP TABLE, "
"TRUNCATE, OPTIMIZE, BACKUP, CREATE ROW POLICY, ALTER ROW POLICY, DROP ROW POLICY, "
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM MOVES, SYSTEM PULLING REPLICATION LOG, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -77,10 +77,12 @@ BackupEntriesCollector::BackupEntriesCollector(
const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ReadSettings & read_settings_,
const ContextPtr & context_)
: backup_query_elements(backup_query_elements_)
, backup_settings(backup_settings_)
, backup_coordination(backup_coordination_)
, read_settings(read_settings_)
, context(context_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, consistent_metadata_snapshot_timeout(context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000))

View File

@ -30,6 +30,7 @@ public:
BackupEntriesCollector(const ASTBackupQuery::Elements & backup_query_elements_,
const BackupSettings & backup_settings_,
std::shared_ptr<IBackupCoordination> backup_coordination_,
const ReadSettings & read_settings_,
const ContextPtr & context_);
~BackupEntriesCollector();
@ -40,6 +41,7 @@ public:
const BackupSettings & getBackupSettings() const { return backup_settings; }
std::shared_ptr<IBackupCoordination> getBackupCoordination() const { return backup_coordination; }
const ReadSettings & getReadSettings() const { return read_settings; }
ContextPtr getContext() const { return context; }
/// Adds a backup entry which will be later returned by run().
@ -93,6 +95,7 @@ private:
const ASTBackupQuery::Elements backup_query_elements;
const BackupSettings backup_settings;
std::shared_ptr<IBackupCoordination> backup_coordination;
const ReadSettings read_settings;
ContextPtr context;
std::chrono::milliseconds on_cluster_first_sync_timeout;
std::chrono::milliseconds consistent_metadata_snapshot_timeout;

View File

@ -57,7 +57,7 @@ UInt64 BackupEntryFromImmutableFile::getSize() const
return *file_size;
}
UInt128 BackupEntryFromImmutableFile::getChecksum() const
UInt128 BackupEntryFromImmutableFile::getChecksum(const ReadSettings & read_settings) const
{
{
std::lock_guard lock{size_and_checksum_mutex};
@ -73,7 +73,7 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
}
}
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum();
auto calculated_checksum = BackupEntryWithChecksumCalculation<IBackupEntry>::getChecksum(read_settings);
{
std::lock_guard lock{size_and_checksum_mutex};
@ -86,13 +86,13 @@ UInt128 BackupEntryFromImmutableFile::getChecksum() const
}
}
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length) const
std::optional<UInt128> BackupEntryFromImmutableFile::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
{
if (prefix_length == 0)
return 0;
if (prefix_length >= getSize())
return getChecksum();
return getChecksum(read_settings);
/// For immutable files we don't use partial checksums.
return std::nullopt;

View File

@ -27,8 +27,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override;
UInt64 getSize() const override;
UInt128 getChecksum() const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
UInt128 getChecksum(const ReadSettings & read_settings) const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
DataSourceDescription getDataSourceDescription() const override { return data_source_description; }
bool isEncryptedByDisk() const override { return copy_encrypted; }

View File

@ -11,17 +11,17 @@ namespace DB
{
namespace
{
String readFile(const String & file_path)
String readFile(const String & file_path, const ReadSettings & read_settings)
{
auto buf = createReadBufferFromFileBase(file_path, /* settings= */ {});
auto buf = createReadBufferFromFileBase(file_path, read_settings);
String s;
readStringUntilEOF(s, *buf);
return s;
}
String readFile(const DiskPtr & disk, const String & file_path, bool copy_encrypted)
String readFile(const DiskPtr & disk, const String & file_path, const ReadSettings & read_settings, bool copy_encrypted)
{
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, {}) : disk->readFile(file_path);
auto buf = copy_encrypted ? disk->readEncryptedFile(file_path, read_settings) : disk->readFile(file_path, read_settings);
String s;
readStringUntilEOF(s, *buf);
return s;
@ -29,19 +29,19 @@ namespace
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_)
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_)
: file_path(file_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(file_path_))
, data(readFile(file_path_))
, data(readFile(file_path_, read_settings_))
{
}
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_)
BackupEntryFromSmallFile::BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_)
: disk(disk_)
, file_path(file_path_)
, data_source_description(disk_->getDataSourceDescription())
, copy_encrypted(copy_encrypted_ && data_source_description.is_encrypted)
, data(readFile(disk_, file_path, copy_encrypted))
, data(readFile(disk_, file_path, read_settings_, copy_encrypted))
{
}

View File

@ -13,8 +13,8 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupEntryFromSmallFile : public BackupEntryWithChecksumCalculation<IBackupEntry>
{
public:
explicit BackupEntryFromSmallFile(const String & file_path_);
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, bool copy_encrypted_ = false);
explicit BackupEntryFromSmallFile(const String & file_path_, const ReadSettings & read_settings_);
BackupEntryFromSmallFile(const DiskPtr & disk_, const String & file_path_, const ReadSettings & read_settings_, bool copy_encrypted_ = false);
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings &) const override;
UInt64 getSize() const override { return data.size(); }

View File

@ -6,7 +6,7 @@ namespace DB
{
template <typename Base>
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum(const ReadSettings & read_settings) const
{
{
std::lock_guard lock{checksum_calculation_mutex};
@ -26,7 +26,7 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
}
else
{
auto read_buffer = this->getReadBuffer(ReadSettings{}.adjustBufferSize(size));
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(size));
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignoreAll();
calculated_checksum = hashing_read_buffer.getHash();
@ -37,23 +37,20 @@ UInt128 BackupEntryWithChecksumCalculation<Base>::getChecksum() const
}
template <typename Base>
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length) const
std::optional<UInt128> BackupEntryWithChecksumCalculation<Base>::getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const
{
if (prefix_length == 0)
return 0;
size_t size = this->getSize();
if (prefix_length >= size)
return this->getChecksum();
return this->getChecksum(read_settings);
std::lock_guard lock{checksum_calculation_mutex};
ReadSettings read_settings;
if (calculated_checksum)
read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size);
auto read_buffer = this->getReadBuffer(read_settings);
auto read_buffer = this->getReadBuffer(read_settings.adjustBufferSize(calculated_checksum ? prefix_length : size));
HashingReadBuffer hashing_read_buffer(*read_buffer);
hashing_read_buffer.ignore(prefix_length);
auto partial_checksum = hashing_read_buffer.getHash();

View File

@ -11,8 +11,8 @@ template <typename Base>
class BackupEntryWithChecksumCalculation : public Base
{
public:
UInt128 getChecksum() const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override;
UInt128 getChecksum(const ReadSettings & read_settings) const override;
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override;
private:
mutable std::optional<UInt128> calculated_checksum;

View File

@ -17,8 +17,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return entry->getReadBuffer(read_settings); }
UInt64 getSize() const override { return entry->getSize(); }
UInt128 getChecksum() const override { return entry->getChecksum(); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return entry->getPartialChecksum(prefix_length); }
UInt128 getChecksum(const ReadSettings & read_settings) const override { return entry->getChecksum(read_settings); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return entry->getPartialChecksum(prefix_length, read_settings); }
DataSourceDescription getDataSourceDescription() const override { return entry->getDataSourceDescription(); }
bool isEncryptedByDisk() const override { return entry->isEncryptedByDisk(); }
bool isFromFile() const override { return entry->isFromFile(); }

View File

@ -3,6 +3,8 @@
#include <Backups/IBackup.h>
#include <Backups/BackupInfo.h>
#include <Core/Types.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <Parsers/IAST_fwd.h>
#include <boost/noncopyable.hpp>
#include <memory>
@ -37,6 +39,8 @@ public:
std::optional<UUID> backup_uuid;
bool deduplicate_files = true;
bool allow_s3_native_copy = true;
ReadSettings read_settings;
WriteSettings write_settings;
};
static BackupFactory & instance();

View File

@ -57,12 +57,12 @@ namespace
/// Calculate checksum for backup entry if it's empty.
/// Also able to calculate additional checksum of some prefix.
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size)
ChecksumsForNewEntry calculateNewEntryChecksumsIfNeeded(const BackupEntryPtr & entry, size_t prefix_size, const ReadSettings & read_settings)
{
ChecksumsForNewEntry res;
/// The partial checksum should be calculated before the full checksum to enable optimization in BackupEntryWithChecksumCalculation.
res.prefix_checksum = entry->getPartialChecksum(prefix_size);
res.full_checksum = entry->getChecksum();
res.prefix_checksum = entry->getPartialChecksum(prefix_size, read_settings);
res.full_checksum = entry->getChecksum(read_settings);
return res;
}
@ -93,7 +93,12 @@ String BackupFileInfo::describe() const
}
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log)
BackupFileInfo buildFileInfoForBackupEntry(
const String & file_name,
const BackupEntryPtr & backup_entry,
const BackupPtr & base_backup,
const ReadSettings & read_settings,
Poco::Logger * log)
{
auto adjusted_path = removeLeadingSlash(file_name);
@ -126,7 +131,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
/// File with the same name but smaller size exist in previous backup
if (check_base == CheckBackupResult::HasPrefix)
{
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, base_backup_file_info->first, read_settings);
info.checksum = checksums.full_checksum;
/// We have prefix of this file in backup with the same checksum.
@ -146,7 +151,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
{
/// We have full file or have nothing, first of all let's get checksum
/// of current file
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
info.checksum = checksums.full_checksum;
if (info.checksum == base_backup_file_info->second)
@ -169,7 +174,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
}
else
{
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0);
auto checksums = calculateNewEntryChecksumsIfNeeded(backup_entry, 0, read_settings);
info.checksum = checksums.full_checksum;
}
@ -188,7 +193,7 @@ BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const Backu
return info;
}
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool)
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool)
{
BackupFileInfos infos;
infos.resize(backup_entries.size());
@ -210,7 +215,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
++num_active_jobs;
}
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &base_backup, &thread_group, i, log](bool async)
auto job = [&mutex, &num_active_jobs, &event, &exception, &infos, &backup_entries, &read_settings, &base_backup, &thread_group, i, log](bool async)
{
SCOPE_EXIT_SAFE({
std::lock_guard lock{mutex};
@ -237,7 +242,7 @@ BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entr
return;
}
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, log);
infos[i] = buildFileInfoForBackupEntry(name, entry, base_backup, read_settings, log);
}
catch (...)
{

View File

@ -13,6 +13,7 @@ class IBackupEntry;
using BackupPtr = std::shared_ptr<const IBackup>;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
using BackupEntries = std::vector<std::pair<String, BackupEntryPtr>>;
struct ReadSettings;
/// Information about a file stored in a backup.
@ -66,9 +67,9 @@ struct BackupFileInfo
using BackupFileInfos = std::vector<BackupFileInfo>;
/// Builds a BackupFileInfo for a specified backup entry.
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, Poco::Logger * log);
BackupFileInfo buildFileInfoForBackupEntry(const String & file_name, const BackupEntryPtr & backup_entry, const BackupPtr & base_backup, const ReadSettings & read_settings, Poco::Logger * log);
/// Builds a vector of BackupFileInfos for specified backup entries.
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, ThreadPool & thread_pool);
BackupFileInfos buildFileInfosForBackupEntries(const BackupEntries & backup_entries, const BackupPtr & base_backup, const ReadSettings & read_settings, ThreadPool & thread_pool);
}

View File

@ -4,17 +4,16 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace DB
{
BackupReaderDefault::BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_)
BackupReaderDefault::BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
: log(log_)
, read_settings(context_->getBackupReadSettings())
, write_settings(context_->getWriteSettings())
, read_settings(read_settings_)
, write_settings(write_settings_)
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
{
}
@ -37,10 +36,10 @@ void BackupReaderDefault::copyFileToDisk(const String & path_in_backup, size_t f
write_buffer->finalize();
}
BackupWriterDefault::BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_)
BackupWriterDefault::BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_)
: log(log_)
, read_settings(context_->getBackupReadSettings())
, write_settings(context_->getWriteSettings())
, read_settings(read_settings_)
, write_settings(write_settings_)
, write_buffer_size(DBMS_DEFAULT_BUFFER_SIZE)
{
}

View File

@ -3,7 +3,6 @@
#include <Backups/BackupIO.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
#include <Interpreters/Context_fwd.h>
namespace DB
@ -19,7 +18,7 @@ enum class WriteMode;
class BackupReaderDefault : public IBackupReader
{
public:
BackupReaderDefault(Poco::Logger * log_, const ContextPtr & context_);
BackupReaderDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
~BackupReaderDefault() override = default;
/// The function copyFileToDisk() can be much faster than reading the file with readFile() and then writing it to some disk.
@ -46,7 +45,7 @@ protected:
class BackupWriterDefault : public IBackupWriter
{
public:
BackupWriterDefault(Poco::Logger * log_, const ContextPtr & context_);
BackupWriterDefault(const ReadSettings & read_settings_, const WriteSettings & write_settings_, Poco::Logger * log_);
~BackupWriterDefault() override = default;
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;

View File

@ -8,8 +8,8 @@
namespace DB
{
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderDisk"), context_)
BackupReaderDisk::BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderDisk"))
, disk(disk_)
, root_path(root_path_)
, data_source_description(disk->getDataSourceDescription())
@ -56,8 +56,8 @@ void BackupReaderDisk::copyFileToDisk(const String & path_in_backup, size_t file
}
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterDisk"), context_)
BackupWriterDisk::BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterDisk"))
, disk(disk_)
, root_path(root_path_)
, data_source_description(disk->getDataSourceDescription())

View File

@ -13,7 +13,7 @@ using DiskPtr = std::shared_ptr<IDisk>;
class BackupReaderDisk : public BackupReaderDefault
{
public:
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
BackupReaderDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
~BackupReaderDisk() override;
bool fileExists(const String & file_name) override;
@ -33,7 +33,7 @@ private:
class BackupWriterDisk : public BackupWriterDefault
{
public:
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ContextPtr & context_);
BackupWriterDisk(const DiskPtr & disk_, const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
~BackupWriterDisk() override;
bool fileExists(const String & file_name) override;

View File

@ -16,8 +16,8 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
BackupReaderFile::BackupReaderFile(const String & root_path_, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderFile"), context_)
BackupReaderFile::BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderFile"))
, root_path(root_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
{
@ -74,8 +74,8 @@ void BackupReaderFile::copyFileToDisk(const String & path_in_backup, size_t file
}
BackupWriterFile::BackupWriterFile(const String & root_path_, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterFile"), context_)
BackupWriterFile::BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterFile"))
, root_path(root_path_)
, data_source_description(DiskLocal::getLocalDataSourceDescription(root_path))
{

View File

@ -11,7 +11,7 @@ namespace DB
class BackupReaderFile : public BackupReaderDefault
{
public:
explicit BackupReaderFile(const String & root_path_, const ContextPtr & context_);
explicit BackupReaderFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;
@ -29,7 +29,7 @@ private:
class BackupWriterFile : public BackupWriterDefault
{
public:
BackupWriterFile(const String & root_path_, const ContextPtr & context_);
BackupWriterFile(const String & root_path_, const ReadSettings & read_settings_, const WriteSettings & write_settings_);
bool fileExists(const String & file_name) override;
UInt64 getFileSize(const String & file_name) override;

View File

@ -101,8 +101,14 @@ namespace
BackupReaderS3::BackupReaderS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_)
: BackupReaderDefault(&Poco::Logger::get("BackupReaderS3"), context_)
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
bool allow_s3_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
@ -178,8 +184,15 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
BackupWriterS3::BackupWriterS3(
const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_)
: BackupWriterDefault(&Poco::Logger::get("BackupWriterS3"), context_)
const S3::URI & s3_uri_,
const String & access_key_id_,
const String & secret_access_key_,
bool allow_s3_native_copy,
const String & storage_class_name,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)

View File

@ -17,7 +17,7 @@ namespace DB
class BackupReaderS3 : public BackupReaderDefault
{
public:
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ContextPtr & context_);
BackupReaderS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupReaderS3() override;
bool fileExists(const String & file_name) override;
@ -38,7 +38,7 @@ private:
class BackupWriterS3 : public BackupWriterDefault
{
public:
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ContextPtr & context_);
BackupWriterS3(const S3::URI & s3_uri_, const String & access_key_id_, const String & secret_access_key_, bool allow_s3_native_copy, const String & storage_class_name, const ReadSettings & read_settings_, const WriteSettings & write_settings_, const ContextPtr & context_);
~BackupWriterS3() override;
bool fileExists(const String & file_name) override;

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
M(Bool, decrypt_files_from_encrypted_disks) \
M(Bool, deduplicate_files) \
M(Bool, allow_s3_native_copy) \
M(Bool, read_from_filesystem_cache) \
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, internal) \

View File

@ -44,6 +44,10 @@ struct BackupSettings
/// Whether native copy is allowed (optimization for cloud storages, that sometimes could have bugs)
bool allow_s3_native_copy = true;
/// Allow to use the filesystem cache in passive mode - benefit from the existing cache entries,
/// but don't put more entries into the cache.
bool read_from_filesystem_cache = true;
/// 1-based shard index to store in the backup. 0 means all shards.
/// Can only be used with BACKUP ON CLUSTER.
size_t shard_num = 0;

View File

@ -178,6 +178,42 @@ namespace
{
return status == BackupStatus::RESTORING;
}
/// We use slightly different read and write settings for backup/restore
/// with a separate throttler and limited usage of filesystem cache.
ReadSettings getReadSettingsForBackup(const ContextPtr & context, const BackupSettings & backup_settings)
{
auto read_settings = context->getReadSettings();
read_settings.remote_throttler = context->getBackupsThrottler();
read_settings.local_throttler = context->getBackupsThrottler();
read_settings.enable_filesystem_cache = backup_settings.read_from_filesystem_cache;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = backup_settings.read_from_filesystem_cache;
return read_settings;
}
WriteSettings getWriteSettingsForBackup(const ContextPtr & context)
{
auto write_settings = context->getWriteSettings();
write_settings.enable_filesystem_cache_on_write_operations = false;
return write_settings;
}
ReadSettings getReadSettingsForRestore(const ContextPtr & context)
{
auto read_settings = context->getReadSettings();
read_settings.remote_throttler = context->getBackupsThrottler();
read_settings.local_throttler = context->getBackupsThrottler();
read_settings.enable_filesystem_cache = false;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
return read_settings;
}
WriteSettings getWriteSettingsForRestore(const ContextPtr & context)
{
auto write_settings = context->getWriteSettings();
write_settings.enable_filesystem_cache_on_write_operations = false;
return write_settings;
}
}
@ -350,6 +386,8 @@ void BackupsWorker::doBackup(
backup_create_params.backup_uuid = backup_settings.backup_uuid;
backup_create_params.deduplicate_files = backup_settings.deduplicate_files;
backup_create_params.allow_s3_native_copy = backup_settings.allow_s3_native_copy;
backup_create_params.read_settings = getReadSettingsForBackup(context, backup_settings);
backup_create_params.write_settings = getWriteSettingsForBackup(context);
BackupMutablePtr backup = BackupFactory::instance().createBackup(backup_create_params);
/// Write the backup.
@ -378,12 +416,12 @@ void BackupsWorker::doBackup(
/// Prepare backup entries.
BackupEntries backup_entries;
{
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, context};
BackupEntriesCollector backup_entries_collector{backup_query->elements, backup_settings, backup_coordination, backup_create_params.read_settings, context};
backup_entries = backup_entries_collector.run();
}
/// Write the backup entries to the backup.
buildFileInfosForBackupEntries(backup, backup_entries, backup_coordination);
buildFileInfosForBackupEntries(backup, backup_entries, backup_create_params.read_settings, backup_coordination);
writeBackupEntries(backup, std::move(backup_entries), backup_id, backup_coordination, backup_settings.internal);
/// We have written our backup entries, we need to tell other hosts (they could be waiting for it).
@ -433,12 +471,12 @@ void BackupsWorker::doBackup(
}
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination)
void BackupsWorker::buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination)
{
LOG_TRACE(log, "{}", Stage::BUILDING_FILE_INFOS);
backup_coordination->setStage(Stage::BUILDING_FILE_INFOS, "");
backup_coordination->waitForStage(Stage::BUILDING_FILE_INFOS);
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), *backups_thread_pool));
backup_coordination->addFileInfos(::DB::buildFileInfosForBackupEntries(backup_entries, backup->getBaseBackup(), read_settings, *backups_thread_pool));
}
@ -650,6 +688,8 @@ void BackupsWorker::doRestore(
backup_open_params.base_backup_info = restore_settings.base_backup_info;
backup_open_params.password = restore_settings.password;
backup_open_params.allow_s3_native_copy = restore_settings.allow_s3_native_copy;
backup_open_params.read_settings = getReadSettingsForRestore(context);
backup_open_params.write_settings = getWriteSettingsForRestore(context);
BackupPtr backup = BackupFactory::instance().createBackup(backup_open_params);
String current_database = context->getCurrentDatabase();

View File

@ -24,6 +24,7 @@ using BackupPtr = std::shared_ptr<const IBackup>;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
using DataRestoreTasks = std::vector<std::function<void()>>;
struct ReadSettings;
/// Manager of backups and restores: executes backups and restores' threads in the background.
/// Keeps information about backups and restores started in this session.
@ -107,7 +108,7 @@ private:
bool called_async);
/// Builds file infos for specified backup entries.
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, std::shared_ptr<IBackupCoordination> backup_coordination);
void buildFileInfosForBackupEntries(const BackupPtr & backup, const BackupEntries & backup_entries, const ReadSettings & read_settings, std::shared_ptr<IBackupCoordination> backup_coordination);
/// Write backup entries to an opened backup.
void writeBackupEntries(BackupMutablePtr backup, BackupEntries && backup_entries, const OperationID & backup_id, std::shared_ptr<IBackupCoordination> backup_coordination, bool internal);

View File

@ -19,8 +19,8 @@ public:
std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getReadBuffer(read_settings); }
UInt64 getSize() const override { return getInternalBackupEntry()->getSize(); }
UInt128 getChecksum() const override { return getInternalBackupEntry()->getChecksum(); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length); }
UInt128 getChecksum(const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getChecksum(read_settings); }
std::optional<UInt128> getPartialChecksum(size_t prefix_length, const ReadSettings & read_settings) const override { return getInternalBackupEntry()->getPartialChecksum(prefix_length, read_settings); }
DataSourceDescription getDataSourceDescription() const override { return getInternalBackupEntry()->getDataSourceDescription(); }
bool isEncryptedByDisk() const override { return getInternalBackupEntry()->isEncryptedByDisk(); }
bool isFromFile() const override { return getInternalBackupEntry()->isFromFile(); }

View File

@ -21,11 +21,11 @@ public:
virtual UInt64 getSize() const = 0;
/// Returns the checksum of the data.
virtual UInt128 getChecksum() const = 0;
virtual UInt128 getChecksum(const ReadSettings & read_settings) const = 0;
/// Returns a partial checksum, i.e. the checksum calculated for a prefix part of the data.
/// Can return nullopt if the partial checksum is too difficult to calculate.
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */) const { return {}; }
virtual std::optional<UInt128> getPartialChecksum(size_t /* prefix_length */, const ReadSettings &) const { return {}; }
/// Returns a read buffer for reading the data.
virtual std::unique_ptr<SeekableReadBuffer> getReadBuffer(const ReadSettings & read_settings) const = 0;

View File

@ -107,12 +107,27 @@ void registerBackupEngineS3(BackupFactory & factory)
if (params.open_mode == IBackup::OpenMode::READ)
{
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.context);
auto reader = std::make_shared<BackupReaderS3>(S3::URI{s3_uri},
access_key_id,
secret_access_key,
params.allow_s3_native_copy,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
}
else
{
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri}, access_key_id, secret_access_key, params.allow_s3_native_copy, params.s3_storage_class, params.context);
auto writer = std::make_shared<BackupWriterS3>(S3::URI{s3_uri},
access_key_id,
secret_access_key,
params.allow_s3_native_copy,
params.s3_storage_class,
params.read_settings,
params.write_settings,
params.context);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,

View File

@ -169,18 +169,18 @@ void registerBackupEnginesFileAndDisk(BackupFactory & factory)
{
std::shared_ptr<IBackupReader> reader;
if (engine_name == "File")
reader = std::make_shared<BackupReaderFile>(path, params.context);
reader = std::make_shared<BackupReaderFile>(path, params.read_settings, params.write_settings);
else
reader = std::make_shared<BackupReaderDisk>(disk, path, params.context);
reader = std::make_shared<BackupReaderDisk>(disk, path, params.read_settings, params.write_settings);
return std::make_unique<BackupImpl>(backup_name_for_logging, archive_params, params.base_backup_info, reader, params.context);
}
else
{
std::shared_ptr<IBackupWriter> writer;
if (engine_name == "File")
writer = std::make_shared<BackupWriterFile>(path, params.context);
writer = std::make_shared<BackupWriterFile>(path, params.read_settings, params.write_settings);
else
writer = std::make_shared<BackupWriterDisk>(disk, path, params.context);
writer = std::make_shared<BackupWriterDisk>(disk, path, params.read_settings, params.write_settings);
return std::make_unique<BackupImpl>(
backup_name_for_logging,
archive_params,

View File

@ -69,14 +69,14 @@ protected:
static String getChecksum(const BackupEntryPtr & backup_entry)
{
return getHexUIntUppercase(backup_entry->getChecksum());
return getHexUIntUppercase(backup_entry->getChecksum({}));
}
static const constexpr std::string_view NO_CHECKSUM = "no checksum";
static String getPartialChecksum(const BackupEntryPtr & backup_entry, size_t prefix_length)
{
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length);
auto partial_checksum = backup_entry->getPartialChecksum(prefix_length, {});
if (!partial_checksum)
return String{NO_CHECKSUM};
return getHexUIntUppercase(*partial_checksum);
@ -218,7 +218,7 @@ TEST_F(BackupEntriesTest, PartialChecksumBeforeFullChecksum)
TEST_F(BackupEntriesTest, BackupEntryFromSmallFile)
{
writeFile(local_disk, "a.txt");
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt");
auto entry = std::make_shared<BackupEntryFromSmallFile>(local_disk, "a.txt", ReadSettings{});
local_disk->removeFile("a.txt");
@ -239,7 +239,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
std::pair<BackupEntryPtr, bool /* partial_checksum_allowed */> test_cases[]
= {{std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt"), false},
{std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt"), true},
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt"), true}};
{std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}), true}};
for (const auto & [entry, partial_checksum_allowed] : test_cases)
{
EXPECT_EQ(entry->getSize(), 9);
@ -258,7 +258,7 @@ TEST_F(BackupEntriesTest, DecryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt"),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt"),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt")};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{})};
for (const auto & entry : entries)
{
EXPECT_EQ(entry->getSize(), 0);
@ -288,7 +288,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", /* copy_encrypted= */ true)};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "a.txt", ReadSettings{}, /* copy_encrypted= */ true)};
auto encrypted_checksum = getChecksum(entries[0]);
EXPECT_NE(encrypted_checksum, NO_CHECKSUM);
@ -322,7 +322,7 @@ TEST_F(BackupEntriesTest, EncryptedEntriesFromEncryptedDisk)
BackupEntryPtr entries[]
= {std::make_shared<BackupEntryFromImmutableFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromAppendOnlyFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true),
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", /* copy_encrypted= */ true)};
std::make_shared<BackupEntryFromSmallFile>(encrypted_disk, "empty.txt", ReadSettings{}, /* copy_encrypted= */ true)};
for (const auto & entry : entries)
{
EXPECT_EQ(entry->getSize(), 0);

View File

@ -8,6 +8,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Parsers/ASTAlterNamedCollectionQuery.h>
#include <Parsers/ASTDropNamedCollectionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
@ -225,24 +226,15 @@ public:
void remove(const std::string & collection_name)
{
if (!removeIfExists(collection_name))
auto collection_path = getMetadataPath(collection_name);
if (!fs::exists(collection_path))
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
collection_name);
}
}
bool removeIfExists(const std::string & collection_name)
{
auto collection_path = getMetadataPath(collection_name);
if (fs::exists(collection_path))
{
fs::remove(collection_path);
return true;
}
return false;
fs::remove(collection_path);
}
private:
@ -393,36 +385,64 @@ void loadIfNot()
return loadIfNotUnlocked(lock);
}
void removeFromSQL(const std::string & collection_name, ContextPtr context)
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).remove(collection_name);
NamedCollectionFactory::instance().remove(collection_name);
}
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
LoadFromSQL(context).removeIfExists(collection_name);
NamedCollectionFactory::instance().removeIfExists(collection_name);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).remove(query.collection_name);
instance.remove(query.collection_name);
}
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
NamedCollectionFactory::instance().add(query.collection_name, LoadFromSQL(context).create(query));
auto & instance = NamedCollectionFactory::instance();
if (instance.exists(query.collection_name))
{
if (!query.if_not_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
"A named collection `{}` already exists",
query.collection_name);
}
return;
}
instance.add(query.collection_name, LoadFromSQL(context).create(query));
}
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
{
auto lock = lockNamedCollectionsTransaction();
loadIfNotUnlocked(lock);
auto & instance = NamedCollectionFactory::instance();
if (!instance.exists(query.collection_name))
{
if (!query.if_exists)
{
throw Exception(
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
"Cannot remove collection `{}`, because it doesn't exist",
query.collection_name);
}
return;
}
LoadFromSQL(context).update(query);
auto collection = NamedCollectionFactory::instance().getMutable(query.collection_name);
auto collection = instance.getMutable(query.collection_name);
auto collection_lock = collection->lock();
for (const auto & [name, value] : query.changes)

View File

@ -8,6 +8,7 @@ namespace DB
class ASTCreateNamedCollectionQuery;
class ASTAlterNamedCollectionQuery;
class ASTDropNamedCollectionQuery;
namespace NamedCollectionUtils
{
@ -26,8 +27,7 @@ void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
void loadFromSQL(ContextPtr context);
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
void removeFromSQL(const std::string & collection_name, ContextPtr context);
void removeIfExistsFromSQL(const std::string & collection_name, ContextPtr context);
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);

View File

@ -101,6 +101,10 @@ void ProgressIndication::writeFinalProgress()
<< formatReadableSizeWithDecimalSuffix(progress.read_bytes * 1000000000.0 / elapsed_ns) << "/s.)";
else
std::cout << ". ";
auto peak_memory_usage = getMemoryUsage().peak;
if (peak_memory_usage >= 0)
std::cout << "\nPeak memory usage: " << formatReadableSizeWithBinarySuffix(peak_memory_usage) << ".";
}
void ProgressIndication::writeProgress(WriteBufferFromFileDescriptor & message)

View File

@ -70,6 +70,8 @@ ThreadGroup::ThreadGroup()
ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
: thread_id{getThreadId()}, check_current_thread_on_destruction(check_current_thread_on_destruction_)
{
chassert(!current_thread);
last_rusage = std::make_unique<RUsageCounters>();
memory_tracker.setDescription("(for thread)");
@ -123,6 +125,7 @@ ThreadStatus::ThreadStatus(bool check_current_thread_on_destruction_)
ThreadGroupPtr ThreadStatus::getThreadGroup() const
{
chassert(current_thread == this);
return thread_group;
}

View File

@ -218,7 +218,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
}
catch (...)
{
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_path);
LOG_INFO(log, "Failed to delete lock file for {} from S3", snapshot_file_info.path);
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});

View File

@ -77,7 +77,6 @@ public:
void deserializeTextJSON(IColumn & column, ReadBuffer & istr, const FormatSettings &) const override;
void serializeTextJSONPretty(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings, size_t indent) const override;
void serializeTextXML(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const override;
};

View File

@ -107,9 +107,6 @@ DatabasePtr DatabaseFactory::get(const ASTCreateQuery & create, const String & m
{
cckMetadataPathForOrdinary(create, metadata_path);
/// Creates store/xxx/ for Atomic
fs::create_directories(fs::path(metadata_path).parent_path());
DatabasePtr impl = getImpl(create, metadata_path, context);
if (impl && context->hasQueryContext() && context->getSettingsRef().log_queries)

View File

@ -11,9 +11,11 @@
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/filesystemHelpers.h>
#include <Formats/FormatFactory.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
@ -75,10 +77,8 @@ bool DatabaseFilesystem::checkTableFilePath(const std::string & table_path, Cont
/// Check access for file before checking its existence.
if (check_path && !fileOrSymlinkPathStartsWith(table_path, user_files_path))
{
if (throw_on_error)
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File is not inside {}", user_files_path);
else
return false;
/// Access denied is thrown regardless of 'throw_on_error'
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File is not inside {}", user_files_path);
}
/// Check if the corresponding file exists.
@ -128,20 +128,25 @@ bool DatabaseFilesystem::isTableExist(const String & name, ContextPtr context_)
if (tryGetTableFromCache(name))
return true;
return checkTableFilePath(getTablePath(name), context_, /* throw_on_error */false);
return checkTableFilePath(getTablePath(name), context_, /* throw_on_error */ false);
}
StoragePtr DatabaseFilesystem::getTableImpl(const String & name, ContextPtr context_) const
StoragePtr DatabaseFilesystem::getTableImpl(const String & name, ContextPtr context_, bool throw_on_error) const
{
/// Check if table exists in loaded tables map.
if (auto table = tryGetTableFromCache(name))
return table;
auto table_path = getTablePath(name);
checkTableFilePath(table_path, context_, /* throw_on_error */true);
if (!checkTableFilePath(table_path, context_, throw_on_error))
return {};
String format = FormatFactory::instance().getFormatFromFileName(table_path, throw_on_error);
if (format.empty())
return {};
/// If the file exists, create a new table using TableFunctionFile and return it.
auto args = makeASTFunction("file", std::make_shared<ASTLiteral>(table_path));
auto args = makeASTFunction("file", std::make_shared<ASTLiteral>(table_path), std::make_shared<ASTLiteral>(format));
auto table_function = TableFunctionFactory::instance().get(args, context_);
if (!table_function)
@ -158,7 +163,7 @@ StoragePtr DatabaseFilesystem::getTableImpl(const String & name, ContextPtr cont
StoragePtr DatabaseFilesystem::getTable(const String & name, ContextPtr context_) const
{
/// getTableImpl can throw exceptions, do not catch them to show correct error to user.
if (auto storage = getTableImpl(name, context_))
if (auto storage = getTableImpl(name, context_, true))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
@ -167,20 +172,7 @@ StoragePtr DatabaseFilesystem::getTable(const String & name, ContextPtr context_
StoragePtr DatabaseFilesystem::tryGetTable(const String & name, ContextPtr context_) const
{
try
{
return getTableImpl(name, context_);
}
catch (const Exception & e)
{
/// Ignore exceptions thrown by TableFunctionFile, which indicate that there is no table
/// see tests/02722_database_filesystem.sh for more details.
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
return nullptr;
}
throw;
}
return getTableImpl(name, context_, false);
}
bool DatabaseFilesystem::empty() const

View File

@ -48,7 +48,7 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
protected:
StoragePtr getTableImpl(const String & name, ContextPtr context) const;
StoragePtr getTableImpl(const String & name, ContextPtr context, bool throw_on_error) const;
StoragePtr tryGetTableFromCache(const std::string & name) const;

View File

@ -77,6 +77,8 @@ DatabaseMySQL::DatabaseMySQL(
throw;
}
fs::create_directories(metadata_path);
thread = ThreadFromGlobalPool{&DatabaseMySQL::cleanOutdatedTables, this};
}

View File

@ -54,6 +54,7 @@ DatabasePostgreSQL::DatabasePostgreSQL(
, cache_tables(cache_tables_)
, log(&Poco::Logger::get("DatabasePostgreSQL(" + dbname_ + ")"))
{
fs::create_directories(metadata_path);
cleaner_task = getContext()->getSchedulePool().createTask("PostgreSQLCleanerTask", [this]{ removeOutdatedTables(); });
cleaner_task->deactivate();
}

View File

@ -74,19 +74,22 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.file_segment_range = { range.left, range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_size = file_segment_range.size(),
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -495,7 +498,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
appendFilesystemCacheLog(*current_file_segment, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -518,7 +521,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
appendFilesystemCacheLog(file_segments->front(), read_type);
}
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -109,6 +109,8 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -510,11 +510,12 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
map.clear();
bool all_has_nullable = all_nullable;
bool current_has_nullable = false;
for (size_t arg_num = 0; arg_num < args; ++arg_num)
{
const auto & arg = arrays.args[arg_num];
bool current_has_nullable = false;
current_has_nullable = false;
size_t off;
// const array has only one row
@ -549,44 +550,93 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
}
}
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
// We update offsets for all the arrays except the first one. Offsets for the first array would be updated later.
// It is needed to iterate the first array again so that the elements in the result would have fixed order.
if (arg_num)
{
prev_off[arg_num] = off;
if (arg.is_const)
prev_off[arg_num] = 0;
}
if (!current_has_nullable)
all_has_nullable = false;
}
if (all_has_nullable)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
}
// We have NULL in output only once if it should be there
bool null_added = false;
const auto & arg = arrays.args[0];
size_t off;
// const array has only one row
if (arg.is_const)
off = (*arg.offsets)[0];
else
off = (*arg.offsets)[row];
for (const auto & pair : map)
for (auto i : collections::range(prev_off[0], off))
{
if (pair.getMapped() == args)
all_has_nullable = all_nullable;
typename Map::LookupResult pair = nullptr;
if (arg.null_map && (*arg.null_map)[i])
{
current_has_nullable = true;
if (all_has_nullable && !null_added)
{
++result_offset;
result_data.insertDefault();
null_map.push_back(1);
null_added = true;
}
if (null_added)
continue;
}
else if constexpr (is_numeric_column)
{
pair = map.find(columns[0]->getElement(i));
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
pair = map.find(columns[0]->getDataAt(i));
else
{
const char * data = nullptr;
pair = map.find(columns[0]->serializeValueIntoArena(i, arena, data));
}
prev_off[0] = off;
if (arg.is_const)
prev_off[0] = 0;
if (!current_has_nullable)
all_has_nullable = false;
if (pair && pair->getMapped() == args)
{
// We increase pair->getMapped() here to not skip duplicate values from the first array.
++pair->getMapped();
++result_offset;
if constexpr (is_numeric_column)
result_data.insertValue(pair.getKey());
{
result_data.insertValue(pair->getKey());
}
else if constexpr (std::is_same_v<ColumnType, ColumnString> || std::is_same_v<ColumnType, ColumnFixedString>)
result_data.insertData(pair.getKey().data, pair.getKey().size);
{
result_data.insertData(pair->getKey().data, pair->getKey().size);
}
else
result_data.deserializeAndInsertFromArena(pair.getKey().data);
{
result_data.deserializeAndInsertFromArena(pair->getKey().data);
}
if (all_nullable)
null_map.push_back(0);
}
}
result_offsets.getElement(row) = result_offset;
}
}
ColumnPtr result_column = std::move(result_data_ptr);
if (all_nullable)
result_column = ColumnNullable::create(result_column, std::move(null_map_column));
return ColumnArray::create(result_column, std::move(result_offsets_ptr));
}

View File

@ -16,6 +16,7 @@ namespace ActionLocks
extern const StorageActionBlockType DistributedSend = 5;
extern const StorageActionBlockType PartsTTLMerge = 6;
extern const StorageActionBlockType PartsMove = 7;
extern const StorageActionBlockType PullReplicationLog = 8;
}

View File

@ -806,6 +806,13 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
return true;
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeAll();
}
void FileCache::removeKeyIfExists(const Key & key)
{
assertInitialized();
@ -818,7 +825,14 @@ void FileCache::removeKeyIfExists(const Key & key)
/// But if we have multiple replicated zero-copy tables on the same server
/// it became possible to start removing something from cache when it is used
/// by other "zero-copy" tables. That is why it's not an error.
locked_key->removeAllReleasable();
locked_key->removeAll(/* if_releasable */true);
}
void FileCache::removeFileSegment(const Key & key, size_t offset)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeFileSegment(offset);
}
void FileCache::removePathIfExists(const String & path)
@ -830,22 +844,12 @@ void FileCache::removeAllReleasable()
{
assertInitialized();
auto lock = lockCache();
main_priority->iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
if (segment_metadata->releasable())
{
auto file_segment = segment_metadata->file_segment;
locked_key.removeFileSegment(file_segment->offset(), file_segment->lock());
return PriorityIterationResult::REMOVE_AND_CONTINUE;
}
return PriorityIterationResult::CONTINUE;
}, lock);
metadata.iterate([](LockedKey & locked_key) { locked_key.removeAll(/* if_releasable */true); });
if (stash)
{
/// Remove all access information.
auto lock = lockCache();
stash->records.clear();
stash->queue->removeAll(lock);
}
@ -914,7 +918,7 @@ void FileCache::loadMetadata()
continue;
}
const auto key = Key(unhexUInt<UInt128>(key_directory.filename().string().data()));
const auto key = Key::fromKeyString(key_directory.filename().string());
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true);
for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it)
@ -1069,7 +1073,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
{
FileSegments file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata())
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));

View File

@ -83,13 +83,19 @@ public:
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
/// Remove files by `key`. Removes files which might be used at the moment.
/// Remove file segment by `key` and `offset`. Throws if file segment does not exist.
void removeFileSegment(const Key & key, size_t offset);
/// Remove files by `key`. Throws if key does not exist.
void removeKey(const Key & key);
/// Remove files by `key`.
void removeKeyIfExists(const Key & key);
/// Removes files by `path`. Removes files which might be used at the moment.
/// Removes files by `path`.
void removePathIfExists(const String & path);
/// Remove files by `key`. Will not remove files which are used at the moment.
/// Remove files by `key`.
void removeAllReleasable();
std::vector<String> tryGetCachePaths(const Key & key);

View File

@ -7,6 +7,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
FileCacheKey::FileCacheKey(const std::string & path)
: key(sipHash128(path.data(), path.size()))
@ -28,4 +32,11 @@ FileCacheKey FileCacheKey::random()
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
}
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
{
if (key_str.size() != 32)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid cache key hex: {}", key_str);
return FileCacheKey(unhexUInt<UInt128>(key_str.data()));
}
}

View File

@ -21,6 +21,8 @@ struct FileCacheKey
static FileCacheKey random();
bool operator==(const FileCacheKey & other) const { return key == other.key; }
static FileCacheKey fromKeyString(const std::string & key_str);
};
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;

View File

@ -25,6 +25,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
FileSegmentMetadata::FileSegmentMetadata(FileSegmentPtr && file_segment_)
@ -191,6 +192,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
if (it == end())
{
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
return nullptr;
@ -215,6 +218,8 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
return locked_metadata;
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
@ -561,11 +566,11 @@ bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const
return file_segment_metadata->file_segment.use_count() == 2;
}
void LockedKey::removeAllReleasable()
void LockedKey::removeAll(bool if_releasable)
{
for (auto it = key_metadata->begin(); it != key_metadata->end();)
{
if (!it->second->releasable())
if (if_releasable && !it->second->releasable())
{
++it;
continue;
@ -586,17 +591,32 @@ void LockedKey::removeAllReleasable()
}
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset);
auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock());
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
return removeFileSegmentImpl(it, segment_lock);
}
KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock)
{
auto file_segment = it->second->file_segment;
LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), offset, file_segment->reserved_size);
getKey(), file_segment->offset(), file_segment->reserved_size);
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));

View File

@ -87,7 +87,7 @@ struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>,
{
public:
using Key = FileCacheKey;
using IterateCacheMetadataFunc = std::function<void(const LockedKey &)>;
using IterateCacheMetadataFunc = std::function<void(LockedKey &)>;
explicit CacheMetadata(const std::string & path_);
@ -106,6 +106,7 @@ public:
enum class KeyNotFoundPolicy
{
THROW,
THROW_LOGICAL,
CREATE_EMPTY,
RETURN_NULL,
};
@ -169,9 +170,10 @@ struct LockedKey : private boost::noncopyable
std::shared_ptr<const KeyMetadata> getKeyMetadata() const { return key_metadata; }
std::shared_ptr<KeyMetadata> getKeyMetadata() { return key_metadata; }
void removeAllReleasable();
void removeAll(bool if_releasable = true);
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset);
void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);
@ -188,6 +190,8 @@ struct LockedKey : private boost::noncopyable
std::string toString() const;
private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &);
const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
};

View File

@ -1089,52 +1089,32 @@ ConfigurationPtr Context::getUsersConfig()
return shared->users_config;
}
void Context::setUser(const UUID & user_id_, bool set_current_profiles_, bool set_current_roles_, bool set_current_database_)
void Context::setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_)
{
/// Prepare lists of user's profiles, constraints, settings, roles.
/// NOTE: AccessControl::read<User>() and other AccessControl's functions may require some IO work,
/// so Context::getLock() must be unlocked while we're doing this.
std::shared_ptr<const User> user;
std::shared_ptr<const ContextAccess> temp_access;
if (set_current_profiles_ || set_current_roles_ || set_current_database_)
{
std::optional<ContextAccessParams> params;
{
auto lock = getLock();
params.emplace(ContextAccessParams{user_id_, /* full_access= */ false, /* use_default_roles = */ true, {}, settings, current_database, client_info });
}
/// `temp_access` is used here only to extract information about the user, not to actually check access.
/// NOTE: AccessControl::getContextAccess() may require some IO work, so Context::getLock() must be unlocked while we're doing this.
temp_access = getAccessControl().getContextAccess(*params);
user = temp_access->getUser();
}
auto user = getAccessControl().read<User>(user_id_);
std::shared_ptr<const SettingsProfilesInfo> profiles;
if (set_current_profiles_)
profiles = temp_access->getDefaultProfileInfo();
std::optional<std::vector<UUID>> roles;
if (set_current_roles_)
roles = user->granted_roles.findGranted(user->default_roles);
String database;
if (set_current_database_)
database = user->default_database;
auto new_current_roles = current_roles_ ? user->granted_roles.findGranted(*current_roles_) : user->granted_roles.findGranted(user->default_roles);
auto enabled_roles = getAccessControl().getEnabledRolesInfo(new_current_roles, {});
auto enabled_profiles = getAccessControl().getEnabledSettingsInfo(user_id_, user->settings, enabled_roles->enabled_roles, enabled_roles->settings_from_enabled_roles);
const auto & database = user->default_database;
/// Apply user's profiles, constraints, settings, roles.
auto lock = getLock();
setUserID(user_id_);
if (profiles)
{
/// A profile can specify a value and a readonly constraint for same setting at the same time,
/// so we shouldn't check constraints here.
setCurrentProfiles(*profiles, /* check_constraints= */ false);
}
/// A profile can specify a value and a readonly constraint for same setting at the same time,
/// so we shouldn't check constraints here.
setCurrentProfiles(*enabled_profiles, /* check_constraints= */ false);
if (roles)
setCurrentRoles(*roles);
setCurrentRoles(new_current_roles);
/// It's optional to specify the DEFAULT DATABASE in the user's definition.
if (!database.empty())
setCurrentDatabase(database);
}
@ -4550,14 +4530,6 @@ ReadSettings Context::getReadSettings() const
return res;
}
ReadSettings Context::getBackupReadSettings() const
{
ReadSettings read_settings = getReadSettings();
read_settings.remote_throttler = getBackupsThrottler();
read_settings.local_throttler = getBackupsThrottler();
return read_settings;
}
WriteSettings Context::getWriteSettings() const
{
WriteSettings res;

View File

@ -534,12 +534,10 @@ public:
/// Sets the current user assuming that he/she is already authenticated.
/// WARNING: This function doesn't check password!
void setUser(const UUID & user_id_, bool set_current_profiles_ = true, bool set_current_roles_ = true, bool set_current_database_ = true);
void setUser(const UUID & user_id_, const std::optional<const std::vector<UUID>> & current_roles_ = {});
UserPtr getUser() const;
void setUserID(const UUID & user_id_);
std::optional<UUID> getUserID() const;
String getUserName() const;
void setCurrentRoles(const std::vector<UUID> & current_roles_);
@ -1168,9 +1166,6 @@ public:
/** Get settings for reading from filesystem. */
ReadSettings getReadSettings() const;
/** Get settings for reading from filesystem for BACKUPs. */
ReadSettings getBackupReadSettings() const;
/** Get settings for writing to filesystem. */
WriteSettings getWriteSettings() const;
@ -1195,6 +1190,8 @@ private:
void initGlobal();
void setUserID(const UUID & user_id_);
template <typename... Args>
void checkAccessImpl(const Args &... args) const;

View File

@ -336,7 +336,6 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
return db_and_table;
}
if (table_id.database_name == TEMPORARY_DATABASE)
{
/// For temporary tables UUIDs are set in Context::resolveStorageID(...).
@ -369,8 +368,24 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
database = it->second;
}
auto table = database->tryGetTable(table_id.table_name, context_);
if (!table && exception)
StoragePtr table;
if (exception)
{
try
{
table = database->getTable(table_id.table_name, context_);
}
catch (const Exception & e)
{
exception->emplace(e);
}
}
else
{
table = database->tryGetTable(table_id.table_name, context_);
}
if (!table && exception && !exception->has_value())
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs()));
if (!table)

View File

@ -40,6 +40,8 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(types)},
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"key", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>()},
@ -60,6 +62,8 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_key);
columns[i++]->insert(file_segment_offset);
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(cache_type));
columns[i++]->insert(read_from_cache_attempted);

View File

@ -11,16 +11,7 @@
namespace DB
{
///
/// -------- Column --------- Type ------
/// | event_date | DateTime |
/// | event_time | UInt64 |
/// | query_id | String |
/// | remote_file_path | String |
/// | segment_range | Tuple |
/// | read_type | String |
/// -------------------------------------
///
struct FilesystemCacheLogElement
{
enum class CacheType
@ -39,6 +30,8 @@ struct FilesystemCacheLogElement
std::pair<size_t, size_t> file_segment_range{};
std::pair<size_t, size_t> requested_range{};
CacheType cache_type{};
std::string file_segment_key;
size_t file_segment_offset;
size_t file_segment_size;
bool read_from_cache_attempted;
String read_buffer_id;

View File

@ -1,5 +1,4 @@
#include <Interpreters/InterpreterCreateNamedCollectionQuery.h>
#include <Parsers/ASTCreateNamedCollectionQuery.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>

View File

@ -22,11 +22,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
if (query.if_exists)
NamedCollectionUtils::removeIfExistsFromSQL(query.collection_name, current_context);
else
NamedCollectionUtils::removeFromSQL(query.collection_name, current_context);
NamedCollectionUtils::removeFromSQL(query, current_context);
return {};
}

View File

@ -89,13 +89,14 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType ReplicationQueue;
extern StorageActionBlockType DistributedSend;
extern StorageActionBlockType PartsTTLMerge;
extern StorageActionBlockType PartsMove;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType ReplicationQueue;
extern const StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsTTLMerge;
extern const StorageActionBlockType PartsMove;
extern const StorageActionBlockType PullReplicationLog;
}
@ -155,6 +156,8 @@ AccessType getRequiredAccessType(StorageActionBlockType action_type)
return AccessType::SYSTEM_TTL_MERGES;
else if (action_type == ActionLocks::PartsMove)
return AccessType::SYSTEM_MOVES;
else if (action_type == ActionLocks::PullReplicationLog)
return AccessType::SYSTEM_PULLING_REPLICATION_LOG;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type: {}", std::to_string(action_type));
}
@ -371,7 +374,18 @@ BlockIO InterpreterSystemQuery::execute()
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
cache->removeAllReleasable();
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
}
else
{
auto key = FileCacheKey::fromKeyString(query.key_to_drop);
if (query.offset_to_drop.has_value())
cache->removeFileSegment(key, query.offset_to_drop.value());
else
cache->removeKey(key);
}
}
break;
}
@ -502,6 +516,12 @@ BlockIO InterpreterSystemQuery::execute()
case Type::START_DISTRIBUTED_SENDS:
startStopAction(ActionLocks::DistributedSend, true);
break;
case Type::STOP_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, false);
break;
case Type::START_PULLING_REPLICATION_LOG:
startStopAction(ActionLocks::PullReplicationLog, true);
break;
case Type::DROP_REPLICA:
dropReplica(query);
break;
@ -1079,6 +1099,15 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_MOVES, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
{
if (!query.table)
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG);
else
required_access.emplace_back(AccessType::SYSTEM_PULLING_REPLICATION_LOG, query.getDatabase(), query.getTable());
break;
}
case Type::STOP_FETCHES:
case Type::START_FETCHES:
{

View File

@ -91,34 +91,30 @@ void WindowFrame::toString(WriteBuffer & buf) const
void WindowFrame::checkValid() const
{
// Check the validity of offsets.
if (type == WindowFrame::FrameType::ROWS
|| type == WindowFrame::FrameType::GROUPS)
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
if (begin_type == BoundaryType::Offset
&& !((begin_offset.getType() == Field::Types::UInt64
|| begin_offset.getType() == Field::Types::Int64)
&& begin_offset.get<Int64>() >= 0
&& begin_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), begin_offset),
begin_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
if (end_type == BoundaryType::Offset
&& !((end_offset.getType() == Field::Types::UInt64
|| end_offset.getType() == Field::Types::Int64)
&& end_offset.get<Int64>() >= 0
&& end_offset.get<Int64>() < INT_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end offset for '{}' frame must be a nonnegative 32-bit integer, '{}' of type '{}' given",
type,
applyVisitor(FieldVisitorToString(), end_offset),
end_offset.getType());
}
// Check relative positioning of offsets.

View File

@ -45,10 +45,10 @@ namespace ErrorCodes
namespace ActionLocks
{
extern StorageActionBlockType PartsMerge;
extern StorageActionBlockType PartsFetch;
extern StorageActionBlockType PartsSend;
extern StorageActionBlockType DistributedSend;
extern const StorageActionBlockType PartsMerge;
extern const StorageActionBlockType PartsFetch;
extern const StorageActionBlockType PartsSend;
extern const StorageActionBlockType DistributedSend;
}
static void executeCreateQuery(
@ -250,6 +250,9 @@ static void loadSystemDatabaseImpl(ContextMutablePtr context, const String & dat
{
String path = context->getPath() + "metadata/" + database_name;
String metadata_file = path + ".sql";
if (fs::exists(metadata_file + ".tmp"))
fs::remove(metadata_file + ".tmp");
if (fs::exists(fs::path(metadata_file)))
{
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.

View File

@ -15,6 +15,8 @@ ASTPtr ASTAlterNamedCollectionQuery::clone() const
void ASTAlterNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "Alter NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
if (!changes.empty())

View File

@ -18,6 +18,8 @@ ASTPtr ASTCreateNamedCollectionQuery::clone() const
void ASTCreateNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "CREATE NAMED COLLECTION ";
if (if_not_exists)
settings.ostr << "IF NOT EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);

View File

@ -13,6 +13,7 @@ class ASTCreateNamedCollectionQuery : public IAST, public ASTQueryWithOnCluster
public:
std::string collection_name;
SettingsChanges changes;
bool if_not_exists = false;
String getID(char) const override { return "CreateNamedCollectionQuery"; }

View File

@ -13,6 +13,8 @@ ASTPtr ASTDropNamedCollectionQuery::clone() const
void ASTDropNamedCollectionQuery::formatImpl(const IAST::FormatSettings & settings, IAST::FormatState &, IAST::FormatStateStacked) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "DROP NAMED COLLECTION ";
if (if_exists)
settings.ostr << "IF EXISTS ";
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(collection_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}

View File

@ -162,7 +162,9 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_REPLICATION_QUEUES
|| type == Type::START_REPLICATION_QUEUES
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
|| type == Type::START_DISTRIBUTED_SENDS
|| type == Type::STOP_PULLING_REPLICATION_LOG
|| type == Type::START_PULLING_REPLICATION_LOG)
{
if (table)
print_database_table();
@ -210,7 +212,15 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_name.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name;
if (!key_to_drop.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << key_to_drop;
if (offset_to_drop.has_value())
settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << offset_to_drop.value();
}
}
}
else if (type == Type::UNFREEZE)
{

View File

@ -80,6 +80,8 @@ public:
UNFREEZE,
ENABLE_FAILPOINT,
DISABLE_FAILPOINT,
STOP_PULLING_REPLICATION_LOG,
START_PULLING_REPLICATION_LOG,
END
};
@ -108,6 +110,8 @@ public:
UInt64 seconds{};
String filesystem_cache_name;
std::string key_to_drop;
std::optional<size_t> offset_to_drop;
String backup_name;

View File

@ -13,8 +13,9 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
{
ParserKeyword s_alter("ALTER");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_delete("DELETE");
ParserIdentifier name_p;
ParserSetQuery set_p;
ParserToken s_comma(TokenType::Comma);
@ -32,10 +33,13 @@ bool ParserAlterNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & nod
if (!s_collection.ignore(pos, expected))
return false;
if (s_if_exists.ignore(pos, expected))
if_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -1421,15 +1421,17 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_create("CREATE");
ParserKeyword s_attach("ATTACH");
ParserKeyword s_named_collection("NAMED COLLECTION");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserKeyword s_as("AS");
ParserToken s_comma(TokenType::Comma);
ParserIdentifier name_p;
ParserToken s_comma(TokenType::Comma);
String cluster_str;
bool if_not_exists = false;
ASTPtr collection_name;
String cluster_str;
if (!s_create.ignore(pos, expected))
return false;
@ -1437,10 +1439,13 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
if (!s_named_collection.ignore(pos, expected))
return false;
if (s_if_not_exists.ignore(pos, expected))
if_not_exists = true;
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
@ -1465,7 +1470,9 @@ bool ParserCreateNamedCollectionQuery::parseImpl(Pos & pos, ASTPtr & node, Expec
auto query = std::make_shared<ASTCreateNamedCollectionQuery>();
tryGetIdentifierNameInto(collection_name, query->collection_name);
query->if_not_exists = if_not_exists;
query->changes = changes;
query->cluster = std::move(cluster_str);
node = query;
return true;

View File

@ -548,6 +548,7 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// CREATE NAMED COLLECTION name [ON CLUSTER cluster]
class ParserCreateNamedCollectionQuery : public IParserBase
{
protected:

View File

@ -12,6 +12,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
ParserKeyword s_drop("DROP");
ParserKeyword s_collection("NAMED COLLECTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier name_p;
String cluster_str;
@ -31,7 +32,7 @@ bool ParserDropNamedCollectionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node
if (!name_p.parse(pos, collection_name, expected))
return false;
if (ParserKeyword{"ON"}.ignore(pos, expected))
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;

View File

@ -379,6 +379,8 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATED_SENDS:
case Type::STOP_REPLICATION_QUEUES:
case Type::START_REPLICATION_QUEUES:
case Type::STOP_PULLING_REPLICATION_LOG:
case Type::START_PULLING_REPLICATION_LOG:
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
parseDatabaseAndTableAsAST(pos, expected, res->database, res->table);
@ -405,7 +407,15 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
ParserLiteral path_parser;
ASTPtr ast;
if (path_parser.parse(pos, ast, expected))
{
res->filesystem_cache_name = ast->as<ASTLiteral>()->value.safeGet<String>();
if (ParserKeyword{"KEY"}.ignore(pos, expected) && ParserIdentifier().parse(pos, ast, expected))
{
res->key_to_drop = ast->as<ASTIdentifier>()->name();
if (ParserKeyword{"OFFSET"}.ignore(pos, expected) && ParserLiteral().parse(pos, ast, expected))
res->offset_to_drop = ast->as<ASTLiteral>()->value.safeGet<UInt64>();
}
}
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
break;

View File

@ -148,7 +148,7 @@ static int compareValuesWithOffsetFloat(const IColumn * _compared_column,
const auto * reference_column = assert_cast<const ColumnType *>(
_reference_column);
const auto offset = _offset.get<typename ColumnType::ValueType>();
assert(offset >= 0);
chassert(offset >= 0);
const auto compared_value_data = compared_column->getDataAt(compared_row);
assert(compared_value_data.size == sizeof(typename ColumnType::ValueType));

View File

@ -281,7 +281,7 @@ Chain buildPushingToViewsChain(
/// and switch back to the original thread_status.
auto * original_thread = current_thread;
SCOPE_EXIT({ current_thread = original_thread; });
current_thread = nullptr;
std::unique_ptr<ThreadStatus> view_thread_status_ptr = std::make_unique<ThreadStatus>(/*check_current_thread_on_destruction=*/ false);
/// Copy of a ThreadStatus should be internal.
view_thread_status_ptr->setInternalThread();

View File

@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t file_offset = 0;
off_t read_until_position = 0;
std::optional<size_t> file_size;
off_t file_size;
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -68,6 +66,22 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
if (file_size_.has_value())
{
file_size = file_size_.value();
}
else
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
{
hdfsCloseFile(fs.get(), fin);
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
}
file_size = static_cast<size_t>(file_info->mSize);
hdfsFreeFileInfo(file_info, 1);
}
}
~ReadBufferFromHDFSImpl() override
@ -75,16 +89,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
size_t getFileSize()
size_t getFileSize() const
{
if (file_size)
return *file_size;
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
return file_size;
}
bool nextImpl() override
@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;

Some files were not shown because too many files have changed in this diff Show More