Merge remote-tracking branch 'origin/master' into dictinct_in_order_optimization

This commit is contained in:
Igor Nikonov 2022-06-23 16:04:08 +00:00
commit 2fd5467f36
24 changed files with 664 additions and 480 deletions

View File

@ -37,6 +37,8 @@ function(add_contrib cmake_folder)
file(GLOB contrib_files "${base_folder}/*") file(GLOB contrib_files "${base_folder}/*")
if (NOT contrib_files) if (NOT contrib_files)
# Checking out *all* submodules takes > 5 min. Therefore, the smoke build ("FastTest") in CI initializes only the set of
# submodules minimally needed for a build and we cannot assume here that all submodules are populated.
message(STATUS "submodule ${base_folder} is missing or empty. to fix try run:") message(STATUS "submodule ${base_folder} is missing or empty. to fix try run:")
message(STATUS " git submodule update --init") message(STATUS " git submodule update --init")
return() return()

View File

@ -17,11 +17,16 @@ Unpack the data:
xz -v -d mgbench{1..3}.csv.xz xz -v -d mgbench{1..3}.csv.xz
``` ```
Create tables: Create the database and tables:
``` ```sql
CREATE DATABASE mgbench; CREATE DATABASE mgbench;
```
```sql
USE mgbench;
```
```sql
CREATE TABLE mgbench.logs1 ( CREATE TABLE mgbench.logs1 (
log_time DateTime, log_time DateTime,
machine_name LowCardinality(String), machine_name LowCardinality(String),
@ -47,8 +52,10 @@ CREATE TABLE mgbench.logs1 (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY (machine_group, machine_name, log_time); ORDER BY (machine_group, machine_name, log_time);
```
```sql
CREATE TABLE mgbench.logs2 ( CREATE TABLE mgbench.logs2 (
log_time DateTime, log_time DateTime,
client_ip IPv4, client_ip IPv4,
@ -58,8 +65,10 @@ CREATE TABLE mgbench.logs2 (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY log_time; ORDER BY log_time;
```
```sql
CREATE TABLE mgbench.logs3 ( CREATE TABLE mgbench.logs3 (
log_time DateTime64, log_time DateTime64,
device_id FixedString(15), device_id FixedString(15),
@ -82,8 +91,13 @@ clickhouse-client --query "INSERT INTO mgbench.logs2 FORMAT CSVWithNames" < mgbe
clickhouse-client --query "INSERT INTO mgbench.logs3 FORMAT CSVWithNames" < mgbench3.csv clickhouse-client --query "INSERT INTO mgbench.logs3 FORMAT CSVWithNames" < mgbench3.csv
``` ```
Run benchmark queries: ## Run benchmark queries:
```sql
USE mgbench;
``` ```
```sql
-- Q1.1: What is the CPU/network utilization for each web server since midnight? -- Q1.1: What is the CPU/network utilization for each web server since midnight?
SELECT machine_name, SELECT machine_name,
@ -106,8 +120,10 @@ FROM (
AND log_time >= TIMESTAMP '2017-01-11 00:00:00' AND log_time >= TIMESTAMP '2017-01-11 00:00:00'
) AS r ) AS r
GROUP BY machine_name; GROUP BY machine_name;
```
```sql
-- Q1.2: Which computer lab machines have been offline in the past day? -- Q1.2: Which computer lab machines have been offline in the past day?
SELECT machine_name, SELECT machine_name,
@ -119,8 +135,9 @@ WHERE (machine_name LIKE 'cslab%' OR
AND log_time >= TIMESTAMP '2017-01-10 00:00:00' AND log_time >= TIMESTAMP '2017-01-10 00:00:00'
ORDER BY machine_name, ORDER BY machine_name,
log_time; log_time;
```
```sql
-- Q1.3: What are the hourly average metrics during the past 10 days for a specific workstation? -- Q1.3: What are the hourly average metrics during the past 10 days for a specific workstation?
SELECT dt, SELECT dt,
@ -151,8 +168,9 @@ GROUP BY dt,
hr hr
ORDER BY dt, ORDER BY dt,
hr; hr;
```
```sql
-- Q1.4: Over 1 month, how often was each server blocked on disk I/O? -- Q1.4: Over 1 month, how often was each server blocked on disk I/O?
SELECT machine_name, SELECT machine_name,
@ -165,8 +183,9 @@ WHERE machine_group = 'Servers'
GROUP BY machine_name GROUP BY machine_name
ORDER BY spikes DESC ORDER BY spikes DESC
LIMIT 10; LIMIT 10;
```
```sql
-- Q1.5: Which externally reachable VMs have run low on memory? -- Q1.5: Which externally reachable VMs have run low on memory?
SELECT machine_name, SELECT machine_name,
@ -185,8 +204,9 @@ GROUP BY machine_name,
HAVING MIN(mem_free) < 10000 HAVING MIN(mem_free) < 10000
ORDER BY machine_name, ORDER BY machine_name,
dt; dt;
```
```sql
-- Q1.6: What is the total hourly network traffic across all file servers? -- Q1.6: What is the total hourly network traffic across all file servers?
SELECT dt, SELECT dt,
@ -210,8 +230,9 @@ GROUP BY dt,
hr hr
ORDER BY both_sum DESC ORDER BY both_sum DESC
LIMIT 10; LIMIT 10;
```
```sql
-- Q2.1: Which requests have caused server errors within the past 2 weeks? -- Q2.1: Which requests have caused server errors within the past 2 weeks?
SELECT * SELECT *
@ -219,8 +240,9 @@ FROM logs2
WHERE status_code >= 500 WHERE status_code >= 500
AND log_time >= TIMESTAMP '2012-12-18 00:00:00' AND log_time >= TIMESTAMP '2012-12-18 00:00:00'
ORDER BY log_time; ORDER BY log_time;
```
```sql
-- Q2.2: During a specific 2-week period, was the user password file leaked? -- Q2.2: During a specific 2-week period, was the user password file leaked?
SELECT * SELECT *
@ -230,8 +252,10 @@ WHERE status_code >= 200
AND request LIKE '%/etc/passwd%' AND request LIKE '%/etc/passwd%'
AND log_time >= TIMESTAMP '2012-05-06 00:00:00' AND log_time >= TIMESTAMP '2012-05-06 00:00:00'
AND log_time < TIMESTAMP '2012-05-20 00:00:00'; AND log_time < TIMESTAMP '2012-05-20 00:00:00';
```
```sql
-- Q2.3: What was the average path depth for top-level requests in the past month? -- Q2.3: What was the average path depth for top-level requests in the past month?
SELECT top_level, SELECT top_level,
@ -254,8 +278,10 @@ WHERE top_level IN ('/about','/courses','/degrees','/events',
'/publications','/research','/teaching','/ugrad') '/publications','/research','/teaching','/ugrad')
GROUP BY top_level GROUP BY top_level
ORDER BY top_level; ORDER BY top_level;
```
```sql
-- Q2.4: During the last 3 months, which clients have made an excessive number of requests? -- Q2.4: During the last 3 months, which clients have made an excessive number of requests?
SELECT client_ip, SELECT client_ip,
@ -265,8 +291,10 @@ WHERE log_time >= TIMESTAMP '2012-10-01 00:00:00'
GROUP BY client_ip GROUP BY client_ip
HAVING COUNT(*) >= 100000 HAVING COUNT(*) >= 100000
ORDER BY num_requests DESC; ORDER BY num_requests DESC;
```
```sql
-- Q2.5: What are the daily unique visitors? -- Q2.5: What are the daily unique visitors?
SELECT dt, SELECT dt,
@ -278,8 +306,10 @@ FROM (
) AS r ) AS r
GROUP BY dt GROUP BY dt
ORDER BY dt; ORDER BY dt;
```
```sql
-- Q2.6: What are the average and maximum data transfer rates (Gbps)? -- Q2.6: What are the average and maximum data transfer rates (Gbps)?
SELECT AVG(transfer) / 125000000.0 AS transfer_avg, SELECT AVG(transfer) / 125000000.0 AS transfer_avg,
@ -290,8 +320,10 @@ FROM (
FROM logs2 FROM logs2
GROUP BY log_time GROUP BY log_time
) AS r; ) AS r;
```
```sql
-- Q3.1: Did the indoor temperature reach freezing over the weekend? -- Q3.1: Did the indoor temperature reach freezing over the weekend?
SELECT * SELECT *
@ -299,8 +331,10 @@ FROM logs3
WHERE event_type = 'temperature' WHERE event_type = 'temperature'
AND event_value <= 32.0 AND event_value <= 32.0
AND log_time >= '2019-11-29 17:00:00.000'; AND log_time >= '2019-11-29 17:00:00.000';
```
```sql
-- Q3.4: Over the past 6 months, how frequently were each door opened? -- Q3.4: Over the past 6 months, how frequently were each door opened?
SELECT device_name, SELECT device_name,
@ -312,8 +346,14 @@ WHERE event_type = 'door_open'
GROUP BY device_name, GROUP BY device_name,
device_floor device_floor
ORDER BY ct DESC; ORDER BY ct DESC;
```
Query 3.5 below uses a UNION. Set the mode for combining SELECT query results. The setting is only used when shared with UNION without explicitly specifying the UNION ALL or UNION DISTINCT.
```sql
SET union_default_mode = 'DISTINCT'
```
```sql
-- Q3.5: Where in the building do large temperature variations occur in winter and summer? -- Q3.5: Where in the building do large temperature variations occur in winter and summer?
WITH temperature AS ( WITH temperature AS (
@ -365,8 +405,10 @@ SELECT DISTINCT device_name,
FROM temperature FROM temperature
WHERE dt >= DATE '2019-06-01' WHERE dt >= DATE '2019-06-01'
AND dt < DATE '2019-09-01'; AND dt < DATE '2019-09-01';
```
```sql
-- Q3.6: For each device category, what are the monthly power consumption metrics? -- Q3.6: For each device category, what are the monthly power consumption metrics?
SELECT yr, SELECT yr,

View File

@ -215,7 +215,6 @@ public:
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3 /// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedFileIfExists(const String & path, bool /* keep_shared_data */) { removeFileIfExists(path); } virtual void removeSharedFileIfExists(const String & path, bool /* keep_shared_data */) { removeFileIfExists(path); }
virtual String getCacheBasePath() const { return ""; } virtual String getCacheBasePath() const { return ""; }
/// Returns a list of paths because for Log family engines there might be /// Returns a list of paths because for Log family engines there might be

View File

@ -30,13 +30,15 @@ DiskObjectStorageTransaction::DiskObjectStorageTransaction(
, metadata_helper(metadata_helper_) , metadata_helper(metadata_helper_)
{} {}
namespace
{
/// Operation which affects only metadata. Simplest way to /// Operation which affects only metadata. Simplest way to
/// implement via callback. /// implement via callback.
struct PureMetadataOperation final : public IDiskObjectStorageOperation struct PureMetadataObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::function<void(MetadataTransactionPtr tx)> on_execute; std::function<void(MetadataTransactionPtr tx)> on_execute;
PureMetadataOperation( PureMetadataObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
std::function<void(MetadataTransactionPtr tx)> && on_execute_) std::function<void(MetadataTransactionPtr tx)> && on_execute_)
@ -58,7 +60,7 @@ struct PureMetadataOperation final : public IDiskObjectStorageOperation
} }
}; };
struct RemoveObjectOperation final : public IDiskObjectStorageOperation struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::string path; std::string path;
bool delete_metadata_only; bool delete_metadata_only;
@ -66,7 +68,7 @@ struct RemoveObjectOperation final : public IDiskObjectStorageOperation
std::vector<std::string> paths_to_remove; std::vector<std::string> paths_to_remove;
bool if_exists; bool if_exists;
RemoveObjectOperation( RemoveObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const std::string & path_, const std::string & path_,
@ -138,7 +140,7 @@ struct RemoveObjectOperation final : public IDiskObjectStorageOperation
} }
}; };
struct RemoveRecursiveOperation final : public IDiskObjectStorageOperation struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::string path; std::string path;
std::unordered_map<std::string, std::vector<std::string>> paths_to_remove; std::unordered_map<std::string, std::vector<std::string>> paths_to_remove;
@ -146,7 +148,7 @@ struct RemoveRecursiveOperation final : public IDiskObjectStorageOperation
NameSet file_names_remove_metadata_only; NameSet file_names_remove_metadata_only;
std::vector<std::string> path_to_remove_from_cache; std::vector<std::string> path_to_remove_from_cache;
RemoveRecursiveOperation( RemoveRecursiveObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const std::string & path_, const std::string & path_,
@ -232,13 +234,13 @@ struct RemoveRecursiveOperation final : public IDiskObjectStorageOperation
}; };
struct ReplaceFileOperation final : public IDiskObjectStorageOperation struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::string path_from; std::string path_from;
std::string path_to; std::string path_to;
std::vector<std::string> blobs_to_remove; std::vector<std::string> blobs_to_remove;
ReplaceFileOperation( ReplaceFileObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const std::string & path_from_, const std::string & path_from_,
@ -271,12 +273,12 @@ struct ReplaceFileOperation final : public IDiskObjectStorageOperation
} }
}; };
struct WriteFileOperation final : public IDiskObjectStorageOperation struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::string path; std::string path;
std::string blob_path; std::string blob_path;
WriteFileOperation( WriteFileObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const std::string & path_, const std::string & path_,
@ -303,7 +305,7 @@ struct WriteFileOperation final : public IDiskObjectStorageOperation
}; };
struct CopyFileOperation final : public IDiskObjectStorageOperation struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
{ {
std::string from_path; std::string from_path;
std::string to_path; std::string to_path;
@ -311,7 +313,7 @@ struct CopyFileOperation final : public IDiskObjectStorageOperation
std::vector<std::string> created_blobs; std::vector<std::string> created_blobs;
CopyFileOperation( CopyFileObjectStorageOperation(
IObjectStorage & object_storage_, IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_, IMetadataStorage & metadata_storage_,
const std::string & from_path_, const std::string & from_path_,
@ -352,10 +354,12 @@ struct CopyFileOperation final : public IDiskObjectStorageOperation
} }
}; };
}
void DiskObjectStorageTransaction::createDirectory(const std::string & path) void DiskObjectStorageTransaction::createDirectory(const std::string & path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{ {
tx->createDirectory(path); tx->createDirectory(path);
})); }));
@ -364,7 +368,7 @@ void DiskObjectStorageTransaction::createDirectory(const std::string & path)
void DiskObjectStorageTransaction::createDirectories(const std::string & path) void DiskObjectStorageTransaction::createDirectories(const std::string & path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{ {
tx->createDicrectoryRecursive(path); tx->createDicrectoryRecursive(path);
})); }));
@ -374,7 +378,7 @@ void DiskObjectStorageTransaction::createDirectories(const std::string & path)
void DiskObjectStorageTransaction::moveDirectory(const std::string & from_path, const std::string & to_path) void DiskObjectStorageTransaction::moveDirectory(const std::string & from_path, const std::string & to_path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [from_path, to_path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [from_path, to_path](MetadataTransactionPtr tx)
{ {
tx->moveDirectory(from_path, to_path); tx->moveDirectory(from_path, to_path);
})); }));
@ -383,7 +387,7 @@ void DiskObjectStorageTransaction::moveDirectory(const std::string & from_path,
void DiskObjectStorageTransaction::moveFile(const String & from_path, const String & to_path) void DiskObjectStorageTransaction::moveFile(const String & from_path, const String & to_path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [from_path, to_path, this](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [from_path, to_path, this](MetadataTransactionPtr tx)
{ {
if (metadata_storage.exists(to_path)) if (metadata_storage.exists(to_path))
throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
@ -397,7 +401,7 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path) void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{ {
operations_to_execute.emplace_back(std::make_unique<ReplaceFileOperation>(object_storage, metadata_storage, from_path, to_path)); operations_to_execute.emplace_back(std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path));
} }
void DiskObjectStorageTransaction::clearDirectory(const std::string & path) void DiskObjectStorageTransaction::clearDirectory(const std::string & path)
@ -416,23 +420,23 @@ void DiskObjectStorageTransaction::removeFile(const std::string & path)
void DiskObjectStorageTransaction::removeSharedFile(const std::string & path, bool keep_shared_data) void DiskObjectStorageTransaction::removeSharedFile(const std::string & path, bool keep_shared_data)
{ {
operations_to_execute.emplace_back(std::make_unique<RemoveObjectOperation>(object_storage, metadata_storage, path, keep_shared_data, false)); operations_to_execute.emplace_back(std::make_unique<RemoveObjectStorageOperation>(object_storage, metadata_storage, path, keep_shared_data, false));
} }
void DiskObjectStorageTransaction::removeSharedRecursive(const std::string & path, bool keep_all_shared_data, const NameSet & file_names_remove_metadata_only) void DiskObjectStorageTransaction::removeSharedRecursive(const std::string & path, bool keep_all_shared_data, const NameSet & file_names_remove_metadata_only)
{ {
operations_to_execute.emplace_back(std::make_unique<RemoveRecursiveOperation>(object_storage, metadata_storage, path, keep_all_shared_data, file_names_remove_metadata_only)); operations_to_execute.emplace_back(std::make_unique<RemoveRecursiveObjectStorageOperation>(object_storage, metadata_storage, path, keep_all_shared_data, file_names_remove_metadata_only));
} }
void DiskObjectStorageTransaction::removeSharedFileIfExists(const std::string & path, bool keep_shared_data) void DiskObjectStorageTransaction::removeSharedFileIfExists(const std::string & path, bool keep_shared_data)
{ {
operations_to_execute.emplace_back(std::make_unique<RemoveObjectOperation>(object_storage, metadata_storage, path, keep_shared_data, true)); operations_to_execute.emplace_back(std::make_unique<RemoveObjectStorageOperation>(object_storage, metadata_storage, path, keep_shared_data, true));
} }
void DiskObjectStorageTransaction::removeDirectory(const std::string & path) void DiskObjectStorageTransaction::removeDirectory(const std::string & path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{ {
tx->removeDirectory(path); tx->removeDirectory(path);
})); }));
@ -494,19 +498,20 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
auto blob_path = fs::path(remote_fs_root_path) / blob_name; auto blob_path = fs::path(remote_fs_root_path) / blob_name;
operations_to_execute.emplace_back(std::make_unique<WriteFileOperation>(object_storage, metadata_storage, path, blob_path));
auto create_metadata_callback = [tx = shared_from_this(), this, mode, path, blob_name, autocommit] (size_t count) auto create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name, autocommit] (size_t count)
{ {
if (mode == WriteMode::Rewrite) if (mode == WriteMode::Rewrite)
metadata_transaction->createMetadataFile(path, blob_name, count); tx->metadata_transaction->createMetadataFile(path, blob_name, count);
else else
metadata_transaction->addBlobToMetadata(path, blob_name, count); tx->metadata_transaction->addBlobToMetadata(path, blob_name, count);
if (autocommit) if (autocommit)
metadata_transaction->commit(); tx->metadata_transaction->commit();
}; };
operations_to_execute.emplace_back(std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, path, blob_path));
/// We always use mode Rewrite because we simulate append using metadata and different files /// We always use mode Rewrite because we simulate append using metadata and different files
return object_storage.writeObject( return object_storage.writeObject(
blob_path, WriteMode::Rewrite, object_attributes, blob_path, WriteMode::Rewrite, object_attributes,
@ -518,7 +523,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
void DiskObjectStorageTransaction::createHardLink(const std::string & src_path, const std::string & dst_path) void DiskObjectStorageTransaction::createHardLink(const std::string & src_path, const std::string & dst_path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [src_path, dst_path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [src_path, dst_path](MetadataTransactionPtr tx)
{ {
tx->createHardLink(src_path, dst_path); tx->createHardLink(src_path, dst_path);
})); }));
@ -527,7 +532,7 @@ void DiskObjectStorageTransaction::createHardLink(const std::string & src_path,
void DiskObjectStorageTransaction::setReadOnly(const std::string & path) void DiskObjectStorageTransaction::setReadOnly(const std::string & path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{ {
tx->setReadOnly(path); tx->setReadOnly(path);
})); }));
@ -536,7 +541,7 @@ void DiskObjectStorageTransaction::setReadOnly(const std::string & path)
void DiskObjectStorageTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp) void DiskObjectStorageTransaction::setLastModified(const std::string & path, const Poco::Timestamp & timestamp)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path, timestamp](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path, timestamp](MetadataTransactionPtr tx)
{ {
tx->setLastModified(path, timestamp); tx->setLastModified(path, timestamp);
})); }));
@ -545,7 +550,7 @@ void DiskObjectStorageTransaction::setLastModified(const std::string & path, con
void DiskObjectStorageTransaction::createFile(const std::string & path) void DiskObjectStorageTransaction::createFile(const std::string & path)
{ {
operations_to_execute.emplace_back( operations_to_execute.emplace_back(
std::make_unique<PureMetadataOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx) std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path](MetadataTransactionPtr tx)
{ {
tx->createEmptyMetadataFile(path); tx->createEmptyMetadataFile(path);
})); }));
@ -553,7 +558,7 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path) void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
{ {
operations_to_execute.emplace_back(std::make_unique<CopyFileOperation>(object_storage, metadata_storage, from_file_path, to_file_path, remote_fs_root_path)); operations_to_execute.emplace_back(std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path, remote_fs_root_path));
} }
void DiskObjectStorageTransaction::commit() void DiskObjectStorageTransaction::commit()

View File

@ -72,6 +72,7 @@ public:
void createDirectories(const std::string & path) override; void createDirectories(const std::string & path) override;
void clearDirectory(const std::string & path) override; void clearDirectory(const std::string & path) override;
void moveDirectory(const std::string & from_path, const std::string & to_path) override; void moveDirectory(const std::string & from_path, const std::string & to_path) override;
void moveFile(const String & from_path, const String & to_path) override; void moveFile(const String & from_path, const String & to_path) override;

View File

@ -32,337 +32,6 @@ std::string toString(MetadataFromDiskTransactionState state)
__builtin_unreachable(); __builtin_unreachable();
} }
namespace
{
std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString();
}
class SetLastModifiedOperation final : public IMetadataOperation
{
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
public:
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{}
void execute() override
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void undo() override
{
disk.setLastModified(path, old_timestamp);
}
};
class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string prev_data;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}
void undo() override
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
};
class CreateDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.createDirectory(path);
}
void undo() override
{
disk.removeDirectory(path);
}
};
class CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
public:
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
disk.createDirectory(path_to_create);
}
void undo() override
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
};
class RemoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{}
void execute() override
{
disk.removeDirectory(path);
}
void undo() override
{
disk.createDirectory(path);
}
};
class RemoveRecursiveOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_path;
public:
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_path(getTempFileName(fs::path(path).parent_path()))
{
}
void execute() override
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
else if (disk.isDirectory(path))
disk.moveDirectory(path, temp_path);
}
void undo() override
{
if (disk.isFile(temp_path))
disk.moveFile(temp_path, path);
else if (disk.isDirectory(temp_path))
disk.moveDirectory(temp_path, path);
}
void finalize() override
{
if (disk.exists(temp_path))
disk.removeRecursive(temp_path);
if (disk.exists(path))
disk.removeRecursive(path);
}
};
class CreateHardlinkOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.createHardLink(path_from, path_to);
}
void undo() override
{
disk.removeFile(path_to);
}
};
class MoveFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
}
};
class MoveDirectoryOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveDirectory(path_from, path_to);
}
void undo() override
{
disk.moveDirectory(path_to, path_from);
}
};
class ReplaceFileOperation final : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
std::string temp_path_to;
public:
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
, temp_path_to(getTempFileName(fs::path(path_to).parent_path()))
{
}
void execute() override
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
disk.replaceFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
disk.moveFile(temp_path_to, path_to);
}
void finalize() override
{
disk.removeFileIfExists(temp_path_to);
}
};
class WriteFileOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
std::string data;
bool existed = false;
std::string prev_data;
public:
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_)
: path(path_)
, disk(disk_)
, data(data_)
{}
void execute() override
{
if (disk.exists(path))
{
existed = true;
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
}
auto buf = disk.writeFile(path);
writeString(data, *buf);
buf->finalize();
}
void undo() override
{
if (!existed)
disk.removeFileIfExists(path);
else
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
}
}
};
}
void MetadataStorageFromDiskTransaction::writeStringToFile( /// NOLINT void MetadataStorageFromDiskTransaction::writeStringToFile( /// NOLINT
const std::string & path, const std::string & path,
const std::string & data) const std::string & data)

View File

@ -4,21 +4,11 @@
#include <Disks/IDisk.h> #include <Disks/IDisk.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h> #include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include "MetadataStorageFromDiskTransactionOperations.h"
namespace DB namespace DB
{ {
struct IMetadataOperation
{
virtual void execute() = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
};
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
enum class MetadataFromDiskTransactionState enum class MetadataFromDiskTransactionState
{ {
PREPARING, PREPARING,

View File

@ -0,0 +1,261 @@
#include <Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h>
#include <Disks/IDisk.h>
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <ranges>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
static std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString();
}
SetLastModifiedOperation::SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{
}
void SetLastModifiedOperation::execute()
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void SetLastModifiedOperation::undo()
{
disk.setLastModified(path, old_timestamp);
}
UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void UnlinkFileOperation::execute()
{
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
disk.removeFile(path);
}
void UnlinkFileOperation::undo()
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
buf->finalize();
}
CreateDirectoryOperation::CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void CreateDirectoryOperation::execute()
{
disk.createDirectory(path);
}
void CreateDirectoryOperation::undo()
{
disk.removeDirectory(path);
}
CreateDirectoryRecursiveOperation::CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void CreateDirectoryRecursiveOperation::execute()
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
disk.createDirectory(path_to_create);
}
void CreateDirectoryRecursiveOperation::undo()
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
RemoveDirectoryOperation::RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void RemoveDirectoryOperation::execute()
{
disk.removeDirectory(path);
}
void RemoveDirectoryOperation::undo()
{
disk.createDirectory(path);
}
RemoveRecursiveOperation::RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_path(getTempFileName(fs::path(path).parent_path()))
{
}
void RemoveRecursiveOperation:: execute()
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
else if (disk.isDirectory(path))
disk.moveDirectory(path, temp_path);
}
void RemoveRecursiveOperation::undo()
{
if (disk.isFile(temp_path))
disk.moveFile(temp_path, path);
else if (disk.isDirectory(temp_path))
disk.moveDirectory(temp_path, path);
}
void RemoveRecursiveOperation::finalize()
{
if (disk.exists(temp_path))
disk.removeRecursive(temp_path);
if (disk.exists(path))
disk.removeRecursive(path);
}
CreateHardlinkOperation::CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{
}
void CreateHardlinkOperation::execute()
{
disk.createHardLink(path_from, path_to);
}
void CreateHardlinkOperation::undo()
{
disk.removeFile(path_to);
}
MoveFileOperation::MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{
}
void MoveFileOperation::execute()
{
disk.moveFile(path_from, path_to);
}
void MoveFileOperation::undo()
{
disk.moveFile(path_to, path_from);
}
MoveDirectoryOperation::MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{
}
void MoveDirectoryOperation::execute()
{
disk.moveDirectory(path_from, path_to);
}
void MoveDirectoryOperation::undo()
{
disk.moveDirectory(path_to, path_from);
}
ReplaceFileOperation::ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
, temp_path_to(getTempFileName(fs::path(path_to).parent_path()))
{
}
void ReplaceFileOperation::execute()
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
disk.replaceFile(path_from, path_to);
}
void ReplaceFileOperation::undo()
{
disk.moveFile(path_to, path_from);
disk.moveFile(temp_path_to, path_to);
}
void ReplaceFileOperation::finalize()
{
disk.removeFileIfExists(temp_path_to);
}
WriteFileOperation::WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_)
: path(path_)
, disk(disk_)
, data(data_)
{
}
void WriteFileOperation::execute()
{
if (disk.exists(path))
{
existed = true;
auto buf = disk.readFile(path);
readStringUntilEOF(prev_data, *buf);
}
auto buf = disk.writeFile(path);
writeString(data, *buf);
buf->finalize();
}
void WriteFileOperation::undo()
{
if (!existed)
{
disk.removeFileIfExists(path);
}
else
{
auto buf = disk.writeFile(path);
writeString(prev_data, *buf);
}
}
}

View File

@ -0,0 +1,192 @@
#pragma once
#include <Disks/ObjectStorages/IMetadataStorage.h>
namespace DB
{
class IDisk;
/**
* Implementations for transactional operations with metadata used by MetadataStorageFromDisk.
*/
struct IMetadataOperation
{
virtual void execute() = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
};
using MetadataOperationPtr = std::unique_ptr<IMetadataOperation>;
struct SetLastModifiedOperation final : public IMetadataOperation
{
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
};
struct UnlinkFileOperation final : public IMetadataOperation
{
UnlinkFileOperation(const std::string & path_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path;
IDisk & disk;
std::string prev_data;
};
struct CreateDirectoryOperation final : public IMetadataOperation
{
CreateDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path;
IDisk & disk;
};
struct CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
};
struct RemoveDirectoryOperation final : public IMetadataOperation
{
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path;
IDisk & disk;
};
struct RemoveRecursiveOperation final : public IMetadataOperation
{
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute() override;
void undo() override;
void finalize() override;
private:
std::string path;
IDisk & disk;
std::string temp_path;
};
struct CreateHardlinkOperation final : public IMetadataOperation
{
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path_from;
std::string path_to;
IDisk & disk;
};
struct MoveFileOperation final : public IMetadataOperation
{
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path_from;
std::string path_to;
IDisk & disk;
};
struct MoveDirectoryOperation final : public IMetadataOperation
{
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute() override;
void undo() override;
private:
std::string path_from;
std::string path_to;
IDisk & disk;
};
struct ReplaceFileOperation final : public IMetadataOperation
{
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute() override;
void undo() override;
void finalize() override;
private:
std::string path_from;
std::string path_to;
IDisk & disk;
std::string temp_path_to;
};
struct WriteFileOperation final : public IMetadataOperation
{
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_);
void execute() override;
void undo() override;
private:
std::string path;
IDisk & disk;
std::string data;
bool existed = false;
std::string prev_data;
};
}

View File

@ -39,14 +39,10 @@ namespace ErrorCodes
namespace namespace
{ {
template <typename Result, typename Error> bool isNotFoundError(Aws::S3::S3Errors error)
void throwIfError(Aws::Utils::Outcome<Result, Error> & response)
{ {
if (!response.IsSuccess()) return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
{ || error == Aws::S3::S3Errors::NO_SUCH_KEY;
const auto & err = response.GetError();
throw Exception(std::to_string(static_cast<int>(err.GetErrorType())) + ": " + err.GetMessage(), ErrorCodes::S3_ERROR);
}
} }
template <typename Result, typename Error> template <typename Result, typename Error>
@ -55,7 +51,21 @@ void throwIfError(const Aws::Utils::Outcome<Result, Error> & response)
if (!response.IsSuccess()) if (!response.IsSuccess())
{ {
const auto & err = response.GetError(); const auto & err = response.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType())); throw Exception(ErrorCodes::S3_ERROR, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
}
template <typename Result, typename Error>
void throwIfUnexpectedError(const Aws::Utils::Outcome<Result, Error> & response, bool if_exists)
{
/// In this case even if absence of key may be ok for us,
/// the log will be polluted with error messages from aws sdk.
/// Looks like there is no way to suppress them.
if (!response.IsSuccess() && (!if_exists || !isNotFoundError(response.GetError().GetErrorType())))
{
const auto & err = response.GetError();
throw Exception(ErrorCodes::S3_ERROR, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
} }
} }
@ -211,10 +221,9 @@ void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & chi
} while (outcome.GetResult().GetIsTruncated()); } while (outcome.GetResult().GetIsTruncated());
} }
void S3ObjectStorage::removeObject(const std::string & path) void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
{ {
auto client_ptr = client.get(); auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
// If chunk size is 0, only use single delete request // If chunk size is 0, only use single delete request
// This allows us to work with GCS, which doesn't support DeleteObjects // This allows us to work with GCS, which doesn't support DeleteObjects
@ -225,7 +234,7 @@ void S3ObjectStorage::removeObject(const std::string & path)
request.SetKey(path); request.SetKey(path);
auto outcome = client_ptr->DeleteObject(request); auto outcome = client_ptr->DeleteObject(request);
throwIfError(outcome); throwIfUnexpectedError(outcome, if_exists);
} }
else else
{ {
@ -240,25 +249,25 @@ void S3ObjectStorage::removeObject(const std::string & path)
request.SetDelete(delkeys); request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request); auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome); throwIfUnexpectedError(outcome, if_exists);
} }
} }
void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths) void S3ObjectStorage::removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists)
{ {
if (paths.empty()) if (paths.empty())
return; return;
auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
if (!s3_capabilities.support_batch_delete) if (!s3_capabilities.support_batch_delete)
{ {
for (const auto & path : paths) for (const auto & path : paths)
removeObject(path); removeObjectImpl(path, if_exists);
} }
else else
{ {
auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete; size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0; size_t current_position = 0;
@ -283,64 +292,30 @@ void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
request.SetBucket(bucket); request.SetBucket(bucket);
request.SetDelete(delkeys); request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request); auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
throwIfUnexpectedError(outcome, if_exists);
} }
} }
} }
void S3ObjectStorage::removeObject(const std::string & path)
{
removeObjectImpl(path, false);
}
void S3ObjectStorage::removeObjectIfExists(const std::string & path) void S3ObjectStorage::removeObjectIfExists(const std::string & path)
{ {
auto client_ptr = client.get(); removeObjectImpl(path, true);
Aws::S3::Model::ObjectIdentifier obj; }
obj.SetKey(path);
Aws::S3::Model::Delete delkeys; void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
delkeys.SetObjects({obj}); {
removeObjectsImpl(paths, false);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
if (!outcome.IsSuccess() && outcome.GetError().GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
throwIfError(outcome);
} }
void S3ObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths) void S3ObjectStorage::removeObjectsIfExist(const std::vector<std::string> & paths)
{ {
if (paths.empty()) removeObjectsImpl(paths, true);
return;
auto client_ptr = client.get();
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
String keys;
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position]);
current_chunk.push_back(obj);
if (!keys.empty())
keys += ", ";
keys += paths[current_position];
}
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
if (!outcome.IsSuccess() && outcome.GetError().GetErrorType() != Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
throwIfError(outcome);
}
} }
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const

View File

@ -126,6 +126,9 @@ private:
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt, std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const; std::optional<ObjectAttributes> metadata = std::nullopt) const;
void removeObjectImpl(const std::string & path, bool if_exists);
void removeObjectsImpl(const std::vector<std::string> & paths, bool if_exists);
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const; Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
std::string bucket; std::string bucket;

View File

@ -389,12 +389,6 @@ void WriteBufferFromS3::makeSinglepartUpload()
return; return;
} }
if (size == 0)
{
LOG_TRACE(log, "Skipping single part upload. Buffer is empty.");
return;
}
if (schedule) if (schedule)
{ {
put_object_task = std::make_unique<PutObjectTask>(); put_object_task = std::make_unique<PutObjectTask>();

View File

@ -60,5 +60,21 @@ void ASTSubquery::updateTreeHashImpl(SipHash & hash_state) const
IAST::updateTreeHashImpl(hash_state); IAST::updateTreeHashImpl(hash_state);
} }
String ASTSubquery::getAliasOrColumnName() const
{
if (!alias.empty())
return alias;
if (!cte_name.empty())
return cte_name;
return getColumnName();
}
String ASTSubquery::tryGetAlias() const
{
if (!alias.empty())
return alias;
return cte_name;
}
} }

View File

@ -33,6 +33,8 @@ public:
} }
void updateTreeHashImpl(SipHash & hash_state) const override; void updateTreeHashImpl(SipHash & hash_state) const override;
String getAliasOrColumnName() const override;
String tryGetAlias() const override;
protected: protected:
void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -71,7 +71,7 @@ bool compareWithThreshold(const ColumnRawPtrs & raw_block_columns, size_t min_bl
size_t raw_block_columns_size = raw_block_columns.size(); size_t raw_block_columns_size = raw_block_columns.size();
for (size_t i = 0; i < raw_block_columns_size; ++i) for (size_t i = 0; i < raw_block_columns_size; ++i)
{ {
int res = sort_description[i].direction * raw_block_columns[i]->compareAt(min_block_index, 0, *threshold_columns[0], sort_description[i].nulls_direction); int res = sort_description[i].direction * raw_block_columns[i]->compareAt(min_block_index, 0, *threshold_columns[i], sort_description[i].nulls_direction);
if (res < 0) if (res < 0)
return true; return true;

View File

@ -280,14 +280,6 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist. /// Creating directories, if not exist.
for (const auto & disk : getDisks()) for (const auto & disk : getDisks())
{ {
/// TODO: implement it the main issue in DataPartsExchange (not able to send directories metadata)
if (supportsReplication() && settings->allow_remote_fs_zero_copy_replication
&& disk->supportZeroCopyReplication() && metadata_.hasProjections())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Projections are not supported when zero-copy replication is enabled for table. "
"Currently disk '{}' supports zero copy replication", disk->getName());
}
if (disk->isBroken()) if (disk->isBroken())
continue; continue;

View File

@ -33,6 +33,7 @@ namespace ErrorCodes
extern const int UNKNOWN_STORAGE; extern const int UNKNOWN_STORAGE;
extern const int NO_REPLICA_NAME_GIVEN; extern const int NO_REPLICA_NAME_GIVEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int NOT_IMPLEMENTED;
} }
@ -673,6 +674,20 @@ static StoragePtr create(const StorageFactory::Arguments & args)
throw Exception("Wrong number of engine arguments.", ErrorCodes::BAD_ARGUMENTS); throw Exception("Wrong number of engine arguments.", ErrorCodes::BAD_ARGUMENTS);
if (replicated) if (replicated)
{
auto storage_policy = args.getContext()->getStoragePolicy(storage_settings->storage_policy);
for (const auto & disk : storage_policy->getDisks())
{
/// TODO: implement it the main issue in DataPartsExchange (not able to send directories metadata)
if (storage_settings->allow_remote_fs_zero_copy_replication
&& disk->supportZeroCopyReplication() && metadata.hasProjections())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Projections are not supported when zero-copy replication is enabled for table. "
"Currently disk '{}' supports zero copy replication", disk->getName());
}
}
return std::make_shared<StorageReplicatedMergeTree>( return std::make_shared<StorageReplicatedMergeTree>(
zookeeper_path, zookeeper_path,
replica_name, replica_name,
@ -686,6 +701,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
std::move(storage_settings), std::move(storage_settings),
args.has_force_restore_data_flag, args.has_force_restore_data_flag,
renaming_restrictions); renaming_restrictions);
}
else else
return std::make_shared<StorageMergeTree>( return std::make_shared<StorageMergeTree>(
args.table_id, args.table_id,

View File

@ -1422,7 +1422,6 @@ ActionLock StorageMergeTree::stopMergesAndWait()
MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force) MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, const String & part_name, bool force)
{ {
if (force) if (force)
{ {
/// Forcefully stop merges and make part outdated /// Forcefully stop merges and make part outdated
@ -1435,7 +1434,6 @@ MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, c
} }
else else
{ {
/// Wait merges selector /// Wait merges selector
std::unique_lock lock(currently_processing_in_background_mutex); std::unique_lock lock(currently_processing_in_background_mutex);

View File

@ -324,7 +324,7 @@ def test_empty_put(started_cluster, auth):
run_query(instance, put_query) run_query(instance, put_query)
try: assert (
run_query( run_query(
instance, instance,
"select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format( "select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format(
@ -336,10 +336,8 @@ def test_empty_put(started_cluster, auth):
table_format, table_format,
), ),
) )
== "0\n"
assert False, "Query should be failed." )
except helpers.client.QueryRuntimeException as e:
assert str(e).find("The specified key does not exist") != 0
# Test put values in CSV format. # Test put values in CSV format.

View File

@ -1,18 +1,18 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# Tags: no-fasttest, disabled # Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh # shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh . "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "insert into function file(data1.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1" $CLICKHOUSE_CLIENT -q "insert into function file(02305_data1.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "insert into function file(data2.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1" $CLICKHOUSE_CLIENT -q "insert into function file(02305_data2.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "insert into function file(data3.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1" $CLICKHOUSE_CLIENT -q "insert into function file(02305_data3.jsonl) select NULL as x from numbers(10) settings engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "insert into function file(data4.jsonl) select number % 2 ? number : NULL as x from numbers(10) settings engine_file_truncate_on_insert=1" $CLICKHOUSE_CLIENT -q "insert into function file(02305_data4.jsonl) select number % 2 ? number : NULL as x from numbers(10) settings engine_file_truncate_on_insert=1"
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=8" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA'; $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=8" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA';
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=16" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA'; $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=16" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA';
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=24" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA'; $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=24" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA';
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=31" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA'; $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=31" 2>&1 | grep -c 'ONLY_NULLS_WHILE_READING_SCHEMA';
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=32" $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=32"
$CLICKHOUSE_CLIENT -q "desc file('data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=100" $CLICKHOUSE_CLIENT -q "desc file('02305_data*.jsonl') settings input_format_max_rows_to_read_for_schema_inference=100"

View File

@ -0,0 +1,12 @@
-- { echo }
with rhs as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs using (d1) order by rhs.d2; -- { serverError ALIAS_REQUIRED }
with rhs as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs using (d1) order by rhs.d2 settings joined_subquery_requires_alias=0;
0
0
0
0
with rhs_ as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs_ rhs using (d1) order by rhs.d2 settings joined_subquery_requires_alias=0;
0
0
0
0

View File

@ -0,0 +1,4 @@
-- { echo }
with rhs as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs using (d1) order by rhs.d2; -- { serverError ALIAS_REQUIRED }
with rhs as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs using (d1) order by rhs.d2 settings joined_subquery_requires_alias=0;
with rhs_ as (select * from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one))) select lhs.d2 from remote('127.{1,2}', view(select dummy d1, dummy d2 from system.one)) lhs global join rhs_ rhs using (d1) order by rhs.d2 settings joined_subquery_requires_alias=0;

View File

@ -0,0 +1,10 @@
0 999999 999999
0 999998 999998
0 999997 999997
0 999996 999996
0 999995 999995
0 999994 999994
0 999993 999993
0 999992 999992
0 999991 999991
0 999990 999990

View File

@ -0,0 +1,3 @@
-- Regression for PartialSortingTransform optimization
-- that requires at least 1500 rows.
select * from (select * from (select 0 a, toNullable(number) b, toString(number) c from numbers(1e6)) order by a desc, b desc, c limit 1500) limit 10;